zinx-cluster 多进程,多服务器完善
zinx-cluster 多进程,多服务器完善
前言
zinx是一款优秀的基于Golang轻量级并发服务器框架,但是框架不支持分布式,zinx-cluster基于zinx的分布式扩展
zinx-cluster设计请查看之前的文章,之前只能做到单进程多zinx服务,这次完善多进程,多服务器分布式部署
内部客户端
修改 dclient/client.go
func Client(group ddict.GroupName, NodeId int) ziface.IConnection {
...
// 启动client
connChannl := make(chan ziface.IConnection, 1)
go clientSatrt(c.IP, c.Port, connChannl, c.ID) // 增加一个nodeid参数
dmanager.ClientConnManager.Add(c.ID, <-connChannl)
conn, _ = dmanager.ClientConnManager.Get(c.ID)
return conn
}
func clientSatrt(ip string, port int, connChannl chan ziface.IConnection, nodeId int) ziface.IConnection {// 增加一个nodeid参数
...
client.SetDecoder(nil)
// 删除失效链接 新增
client.SetOnConnStop(func(conn ziface.IConnection) {
fmt.Println("内部客户端删除链接")
dmanager.ClientConnManager.Remove(nodeId)
})
client.Start()
select {}
}
字典
....
type NodeInfo struct {
ID int // node id
Name string // node 名称
Type int // node类型 1.gateway 2.backend
IP string // IP
Port int // 端口号
}
// 新增
type NodeDict struct {
ID int // node id
Name string // node 名称
Type int // node类型 1.gateway 2.backend
IP string // IP
Port int // 端口号
GroupName GroupName
}
...
type NodeGroupMap = map[GroupName]NodeSlice
type NodeDicts = map[int]*NodeDict // 新增
...
// Route字典
func (d *Dict) GetRouteDicts() RouteDicts {
r := make(RouteDicts)
for groupName, group := range d.RouteGroup {
for _, v := range group {
r[v.MsgID] = &RouteDict{
MsgID: v.MsgID,
Name: v.Name,
GroupName: groupName,
}
}
}
return r
}
// node字典 新增
func (d *Dict) GetNodeDicts() NodeDicts {
r := make(NodeDicts)
for groupName, group := range d.NodeGroup {
for _, v := range group {
r[v.ID] = &NodeDict{
ID: v.ID,
Name: v.Name,
Type: v.Type,
IP: v.IP,
Port: v.Port,
GroupName: groupName,
}
}
}
return r
}
多进程demo
扩展修改完成后,完成一个多进程的demo
demo目录
multiProcess
|____gate # 可以看成一个zinx项目
| |____gate.go # zinx的入口文件
| |____conf
| | |____zinx.json # zinx的配置文件
| |____handlers # zinx 业务代码
| | |____handler.go
|____im # 可以看成一个zinx项目
| |____im.go # zinx的入口文件
| |____conf
| | |____zinx.json # zinx的配置文件
| |____handlers # zinx 业务代码
| | |____handler.go
|____conf # zinx-cluster的配置文件
| |____dinx.json # zinx-cluster Node配置文件
| |____route.json # zinx-cluster 路由配置文件
|____client # 客户端
| |____main.go
|____main.go # 项目启动入口
从目录可以看出,zinx-cluster通过dinx的配置把多个zinx项目组合成一个分布式项目
项目说明
- zinx-cluster配置文件
// dinx.json 有两组类型node(gate,im),gate有一台服务,im有两台
{
"gate": [
{"ID": 1, "Name": "gate-1", "Type": 1, "IP": "0.0.0.0", "Port": 10000, "IsPass": 1}
],
"im": [
{"ID": 2, "Name": "im-1", "Type": 2, "IP": "0.0.0.0", "Port": 20000, "IsPass": 1},
{"ID": 3, "Name": "im-2", "Type": 2, "IP": "0.0.0.0", "Port": 21000, "IsPass": 1}
]
}
// route.json,gate有两个路由 im也有两个路由
{
"gate": [
{"MsgID":1,"Name":"登录"},
{"MsgID":2,"Name":"退出"}
],
"im": [
{"MsgID":3, "Name": "单送"},
{"MsgID":4,"Name":"群发"}
]
}
- zinx-cluster入口文件
package main
import (
"flag"
"fmt"
"os"
"os/signal"
"github.com/aceld/zinx/ziface"
"github.com/timzzx/zinx-cluster/dconf"
"github.com/timzzx/zinx-cluster/ddict"
"github.com/timzzx/zinx-cluster/zinx-cluster-demo/multiProcess/gate"
"github.com/timzzx/zinx-cluster/zinx-cluster-demo/multiProcess/im"
)
func main() {
// 获取需要启动的NodeID,默认一个进程启动所有服务,否则按照nodeid来启动服务
nodeId := flag.Int("id", 0, "id")
flag.Parse()
// Node集合
apps := make(map[string]func(n *ddict.NodeInfo, groupName ddict.GroupName) ziface.IServer, 0)
apps["gate"] = gate.App // 加载gate zinx项目加载到zinx-cluster
apps["im"] = im.App // 加载im zinx项目加载到zinx-cluster
var servers []ziface.IServer
// 获取配置
c := dconf.Dicts.NodeList()
if *nodeId != 0 {
for k, v := range c {
for _, n := range v {
if *nodeId == n.ID {
s := apps[k](n, k)
servers = append(servers, s)
// 启动
go s.Serve()
}
}
}
} else {
for k, v := range c {
for _, n := range v {
s := apps[k](n, k)
servers = append(servers, s)
// 启动
go s.Serve()
}
}
}
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt) // TODO kill -9 mac上有点问题
sig := <-exit
fmt.Println("===exit===", sig)
for _, node := range servers {
node.Stop()
}
}
- gate入口文件
package gate
import (
"fmt"
"github.com/aceld/zinx/zconf"
"github.com/aceld/zinx/zdecoder"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/znet"
"github.com/timzzx/zinx-cluster/ddict"
"github.com/timzzx/zinx-cluster/dmanager"
"github.com/timzzx/zinx-cluster/dnode"
"github.com/timzzx/zinx-cluster/zinx-cluster-demo/multiProcess/gate/handlers"
)
func App(n *ddict.NodeInfo, groupName ddict.GroupName) ziface.IServer {
// 设置zinx配置
config := &zconf.Config{
Name: n.Name,
Host: n.IP,
TCPPort: n.Port,
WsPort: n.Port + 1,
// Mode: "websocket",
RouterSlicesMode: true,
LogIsolationLevel: 2,
}
s := znet.NewUserConfServer(config)
// 前端
if n.Type == 1 {
fmt.Println("开启前端拦截器")
// 启动数据拦截
s.AddInterceptor(&zdecoder.TLVDecoder{})
s.AddInterceptor(&dnode.NodeInterceptor{Node: n, GroupName: groupName})
// 关闭默认的解码器 因为提前解码获取参数,所以后续的解码拦截器要关闭,不然会重复解码报错
s.SetDecoder(nil)
}
// 删除失效链接
s.SetOnConnStop(func(conn ziface.IConnection) {
fmt.Println("删除失效连接")
dmanager.MemberConnManager.Remove(conn)
})
// handlers (这里就是gate路由业务加载的地方)
s.AddRouterSlices(1, handlers.Login)
s.AddRouterSlices(2, handlers.Logout)
return s
}
- gate具体业务写在handlers下
package handlers
import (
"fmt"
"github.com/aceld/zinx/ziface"
"github.com/timzzx/zinx-cluster/dmessage"
)
// 登录
func Login(request ziface.IRequest) {
fmt.Println("服务器名称:", request.GetConnection().GetName())
fmt.Println("接收消息id:", request.GetMsgID(), "[登录成功]")
decode, _ := dmessage.Decode(request.GetData())
data, _ := dmessage.Encode(decode.ConnID, 0, "", 2, []byte("登录成功"))
request.GetConnection().SendMsg(1, data)
}
// 退出
func Logout(request ziface.IRequest) {
fmt.Println("服务器名称:", request.GetConnection().GetName())
fmt.Println("接收消息id:", request.GetMsgID(), "[退出成功]")
decode, _ := dmessage.Decode(request.GetData())
data, _ := dmessage.Encode(decode.ConnID, 0, "", 2, []byte("退出成功"))
request.GetConnection().SendMsg(2, data)
}
客户端
package main
import (
"fmt"
"time"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/znet"
)
// 客户端自定义业务
func loginLoop(conn ziface.IConnection) {
err := conn.SendMsg(1, []byte("登录[FromClient]"))
if err != nil {
fmt.Println(err)
return
}
time.Sleep(1 * time.Second)
err = conn.SendMsg(2, []byte("退出[FromClient]"))
if err != nil {
fmt.Println(err)
return
}
time.Sleep(1 * time.Second)
err = conn.SendMsg(3, []byte("单发[FromClient]"))
if err != nil {
fmt.Println(err)
return
}
time.Sleep(1 * time.Second)
err = conn.SendMsg(4, []byte("群发[FromClient]"))
if err != nil {
fmt.Println(err)
return
}
}
// 创建连接的时候执行
func onClientStart(conn ziface.IConnection) {
fmt.Println("onClientStart is Called ... ")
go loginLoop(conn)
}
type LoginRouter struct {
znet.BaseRouter
}
func (this *LoginRouter) Handle(request ziface.IRequest) {
fmt.Println("登录成功")
}
type LogoutRouter struct {
znet.BaseRouter
}
func (this *LogoutRouter) Handle(request ziface.IRequest) {
fmt.Println("退出成功")
}
type SendRouter struct {
znet.BaseRouter
}
func (this *SendRouter) Handle(request ziface.IRequest) {
fmt.Println("单发成功")
}
type FullSendRouter struct {
znet.BaseRouter
}
func (this *FullSendRouter) Handle(request ziface.IRequest) {
fmt.Println("全发成功")
}
func main() {
//创建Client客户端
client := znet.NewClient("127.0.0.1", 10000)
//设置链接建立成功后的钩子函数
client.SetOnConnStart(onClientStart)
client.AddRouter(1, &LoginRouter{})
client.AddRouter(2, &LogoutRouter{})
client.AddRouter(3, &SendRouter{})
client.AddRouter(4, &FullSendRouter{})
//启动客户端
client.Start()
//防止进程退出,等待中断信号
select {}
}
启动业务测试
服务器单进程启动三个业务 gate-1 im-1 im-2
-> multiProcess git:(main) ✗ go run main.go
output:
██
▀▀
████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀
┌──────────────────────────────────────────────────────┐
│ [Github] https://github.com/aceld │
│ [tutorial] https://www.yuque.com/aceld/npyr8s/bgftov │
│ [document] https://www.yuque.com/aceld/tsgooa │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 20000
WsPort: 20001
Name: im-1
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:
PrivateKeyFile:
==============================
██
▀▀
████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀
┌──────────────────────────────────────────────────────┐
│ [Github] https://github.com/aceld │
│ [tutorial] https://www.yuque.com/aceld/npyr8s/bgftov │
│ [document] https://www.yuque.com/aceld/tsgooa │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 21000
WsPort: 21001
Name: im-2
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:
PrivateKeyFile:
==============================
██
▀▀
████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀
┌──────────────────────────────────────────────────────┐
│ [Github] https://github.com/aceld │
│ [tutorial] https://www.yuque.com/aceld/npyr8s/bgftov │
│ [document] https://www.yuque.com/aceld/tsgooa │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 10000
WsPort: 10001
Name: gate-1
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:
PrivateKeyFile:
==============================
开启前端拦截器
客户端运行
go run client/main.go
output:
2024/04/01 23:27:10 [INFO]connection.go:213: [Reader Goroutine is running]
2024/04/01 23:27:10 [DEBUG]connection.go:239: read buffer 00000001000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001010304010ce799bbe5bd95e68890e58a9f00
登录成功
2024/04/01 23:27:11 [DEBUG]connection.go:239: read buffer 00000002000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001010304010ce98080e587bae68890e58a9f00
退出成功
2024/04/01 23:27:13 [DEBUG]connection.go:239: read buffer 000000040000000b46756c6c53656e642e2e2e
全发成功
在运行一个client
onClientStart is Called ...
2024/04/01 23:27:58 [INFO]connection.go:213: [Reader Goroutine is running]
2024/04/01 23:27:58 [DEBUG]connection.go:239: read buffer 00000001000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001020304010ce799bbe5bd95e68890e58a9f00
登录成功
2024/04/01 23:27:59 [DEBUG]connection.go:239: read buffer 00000002000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001020304010ce98080e587bae68890e58a9f00
退出成功
2024/04/01 23:28:00 [DEBUG]connection.go:239: read buffer 000000030000000753656e642e2e2e
单发成功
2024/04/01 23:28:01 [DEBUG]connection.go:239: read buffer 000000040000000b46756c6c53656e642e2e2e
全发成功
服务器多进程启动三个业务 gate-1 im-1 im-2
gate-1
multiProcess git:(main) ✗ go run main.go -id=1
output:
▀▀
████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀
┌──────────────────────────────────────────────────────┐
│ [Github] github.com/aceld │
│ [tutorial] www.yuque.com/aceld/npyr8s/bgftov │
│ [document] www.yuque.com/aceld/tsgooa │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 10000
WsPort: 10001
Name: gate-1
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:
PrivateKeyFile:
开启前端拦截器
im-1
multiProcess git:(main) ✗ go run main.go -id=2
output:
▀▀
████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀
┌──────────────────────────────────────────────────────┐
│ [Github] github.com/aceld │
│ [tutorial] www.yuque.com/aceld/npyr8s/bgftov │
│ [document] www.yuque.com/aceld/tsgooa │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 20000
WsPort: 20001
Name: im-1
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:
PrivateKeyFile:
im-2
multiProcess git:(main) ✗ go run main.go -id=3
output:
██
▀▀
████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀
┌──────────────────────────────────────────────────────┐
│ [Github] github.com/aceld │
│ [tutorial] www.yuque.com/aceld/npyr8s/bgftov │
│ [document] www.yuque.com/aceld/tsgooa │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 21000
WsPort: 21001
Name: im-2
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:
PrivateKeyFile:
```
这样就是可以多进程,或者多服务器进行部署了。
总结
现在zinx-cluster可以做到 单进程启动多服务 和 多进程多服务器启动
使用注意
- rpc需要使用者自己去选择,zinx-cluster不准备集成rpc
- 多前端和多进程的项目,用户连接需要使用外部数据库来完成
本作品采用《CC 协议》,转载必须注明作者和本文链接