zinx-cluster 多进程,多服务器完善

zinx-cluster 多进程,多服务器完善

前言

zinx是一款优秀的基于Golang轻量级并发服务器框架,但是框架不支持分布式,zinx-cluster基于zinx的分布式扩展

github代码

zinx-cluster设计请查看之前的文章,之前只能做到单进程多zinx服务,这次完善多进程,多服务器分布式部署

内部客户端

修改 dclient/client.go

func Client(group ddict.GroupName, NodeId int) ziface.IConnection {
    ...
    // 启动client
    connChannl := make(chan ziface.IConnection, 1)
    go clientSatrt(c.IP, c.Port, connChannl, c.ID) // 增加一个nodeid参数
    dmanager.ClientConnManager.Add(c.ID, <-connChannl)
    conn, _ = dmanager.ClientConnManager.Get(c.ID)
    return conn
}

func clientSatrt(ip string, port int, connChannl chan ziface.IConnection, nodeId int) ziface.IConnection {// 增加一个nodeid参数
    ...
    client.SetDecoder(nil)
    // 删除失效链接 新增
    client.SetOnConnStop(func(conn ziface.IConnection) {
        fmt.Println("内部客户端删除链接")
        dmanager.ClientConnManager.Remove(nodeId)
    })
    client.Start()
    select {}
}

字典

....
type NodeInfo struct {
    ID   int    // node id
    Name string // node 名称
    Type int    // node类型 1.gateway 2.backend
    IP   string // IP
    Port int    // 端口号
}
// 新增
type NodeDict struct {
    ID        int    // node id
    Name      string // node 名称
    Type      int    // node类型 1.gateway 2.backend
    IP        string // IP
    Port      int    // 端口号
    GroupName GroupName
}
...
type NodeGroupMap = map[GroupName]NodeSlice
type NodeDicts = map[int]*NodeDict // 新增
...
// Route字典
func (d *Dict) GetRouteDicts() RouteDicts {
    r := make(RouteDicts)

    for groupName, group := range d.RouteGroup {
        for _, v := range group {
            r[v.MsgID] = &RouteDict{
                MsgID:     v.MsgID,
                Name:      v.Name,
                GroupName: groupName,
            }
        }
    }
    return r
}

// node字典 新增
func (d *Dict) GetNodeDicts() NodeDicts {
    r := make(NodeDicts)
    for groupName, group := range d.NodeGroup {
        for _, v := range group {
            r[v.ID] = &NodeDict{
                ID:        v.ID,
                Name:      v.Name,
                Type:      v.Type,
                IP:        v.IP,
                Port:      v.Port,
                GroupName: groupName,
            }

        }
    }
    return r
}

多进程demo

扩展修改完成后,完成一个多进程的demo

demo目录

multiProcess
|____gate # 可以看成一个zinx项目
| |____gate.go # zinx的入口文件
| |____conf
| | |____zinx.json # zinx的配置文件
| |____handlers # zinx 业务代码
| | |____handler.go
|____im  # 可以看成一个zinx项目
| |____im.go # zinx的入口文件
| |____conf
| | |____zinx.json # zinx的配置文件
| |____handlers # zinx 业务代码
| | |____handler.go
|____conf # zinx-cluster的配置文件
| |____dinx.json # zinx-cluster Node配置文件
| |____route.json # zinx-cluster 路由配置文件
|____client # 客户端
| |____main.go
|____main.go # 项目启动入口

从目录可以看出,zinx-cluster通过dinx的配置把多个zinx项目组合成一个分布式项目

项目说明

  • zinx-cluster配置文件
// dinx.json 有两组类型node(gate,im),gate有一台服务,im有两台
{
    "gate": [
        {"ID": 1, "Name": "gate-1", "Type": 1, "IP": "0.0.0.0", "Port": 10000, "IsPass": 1}
    ],
    "im": [
        {"ID": 2, "Name": "im-1", "Type": 2, "IP": "0.0.0.0", "Port": 20000, "IsPass": 1},
        {"ID": 3, "Name": "im-2", "Type": 2, "IP": "0.0.0.0", "Port": 21000, "IsPass": 1}
    ]
}
// route.json,gate有两个路由 im也有两个路由
{
    "gate": [
        {"MsgID":1,"Name":"登录"},
        {"MsgID":2,"Name":"退出"}
    ],
    "im": [
        {"MsgID":3, "Name": "单送"},
        {"MsgID":4,"Name":"群发"}
    ]
}
  • zinx-cluster入口文件
package main

import (
    "flag"
    "fmt"
    "os"
    "os/signal"

    "github.com/aceld/zinx/ziface"
    "github.com/timzzx/zinx-cluster/dconf"
    "github.com/timzzx/zinx-cluster/ddict"
    "github.com/timzzx/zinx-cluster/zinx-cluster-demo/multiProcess/gate"
    "github.com/timzzx/zinx-cluster/zinx-cluster-demo/multiProcess/im"
)

func main() {
    // 获取需要启动的NodeID,默认一个进程启动所有服务,否则按照nodeid来启动服务
    nodeId := flag.Int("id", 0, "id")
    flag.Parse()

    // Node集合
    apps := make(map[string]func(n *ddict.NodeInfo, groupName ddict.GroupName) ziface.IServer, 0)
    apps["gate"] = gate.App // 加载gate zinx项目加载到zinx-cluster
    apps["im"] = im.App // 加载im zinx项目加载到zinx-cluster

    var servers []ziface.IServer
    // 获取配置
    c := dconf.Dicts.NodeList()
    if *nodeId != 0 {
        for k, v := range c {
            for _, n := range v {
                if *nodeId == n.ID {
                    s := apps[k](n, k)
                    servers = append(servers, s)
                    // 启动
                    go s.Serve()
                }
            }
        }
    } else {
        for k, v := range c {
            for _, n := range v {
                s := apps[k](n, k)
                servers = append(servers, s)
                // 启动
                go s.Serve()
            }
        }
    }

    exit := make(chan os.Signal, 1)
    signal.Notify(exit, os.Interrupt) // TODO kill -9 mac上有点问题
    sig := <-exit
    fmt.Println("===exit===", sig)
    for _, node := range servers {
        node.Stop()
    }
}
  • gate入口文件
package gate

import (
    "fmt"

    "github.com/aceld/zinx/zconf"
    "github.com/aceld/zinx/zdecoder"
    "github.com/aceld/zinx/ziface"
    "github.com/aceld/zinx/znet"
    "github.com/timzzx/zinx-cluster/ddict"
    "github.com/timzzx/zinx-cluster/dmanager"
    "github.com/timzzx/zinx-cluster/dnode"
    "github.com/timzzx/zinx-cluster/zinx-cluster-demo/multiProcess/gate/handlers"
)

func App(n *ddict.NodeInfo, groupName ddict.GroupName) ziface.IServer {
    // 设置zinx配置
    config := &zconf.Config{
        Name:    n.Name,
        Host:    n.IP,
        TCPPort: n.Port,
        WsPort:  n.Port + 1,
        // Mode:             "websocket",
        RouterSlicesMode:  true,
        LogIsolationLevel: 2,
    }
    s := znet.NewUserConfServer(config)
    // 前端
    if n.Type == 1 {
        fmt.Println("开启前端拦截器")
        // 启动数据拦截
        s.AddInterceptor(&zdecoder.TLVDecoder{})
        s.AddInterceptor(&dnode.NodeInterceptor{Node: n, GroupName: groupName})
        // 关闭默认的解码器  因为提前解码获取参数,所以后续的解码拦截器要关闭,不然会重复解码报错
        s.SetDecoder(nil)
    }
    // 删除失效链接
    s.SetOnConnStop(func(conn ziface.IConnection) {
        fmt.Println("删除失效连接")
        dmanager.MemberConnManager.Remove(conn)
    })

    // handlers (这里就是gate路由业务加载的地方)
    s.AddRouterSlices(1, handlers.Login)
    s.AddRouterSlices(2, handlers.Logout)

    return s
}
  • gate具体业务写在handlers下
package handlers

import (
    "fmt"

    "github.com/aceld/zinx/ziface"
    "github.com/timzzx/zinx-cluster/dmessage"
)

// 登录
func Login(request ziface.IRequest) {
    fmt.Println("服务器名称:", request.GetConnection().GetName())
    fmt.Println("接收消息id:", request.GetMsgID(), "[登录成功]")
    decode, _ := dmessage.Decode(request.GetData())
    data, _ := dmessage.Encode(decode.ConnID, 0, "", 2, []byte("登录成功"))
    request.GetConnection().SendMsg(1, data)
}

// 退出
func Logout(request ziface.IRequest) {
    fmt.Println("服务器名称:", request.GetConnection().GetName())
    fmt.Println("接收消息id:", request.GetMsgID(), "[退出成功]")

    decode, _ := dmessage.Decode(request.GetData())
    data, _ := dmessage.Encode(decode.ConnID, 0, "", 2, []byte("退出成功"))
    request.GetConnection().SendMsg(2, data)
}

客户端

package main

import (
    "fmt"
    "time"

    "github.com/aceld/zinx/ziface"
    "github.com/aceld/zinx/znet"
)

// 客户端自定义业务
func loginLoop(conn ziface.IConnection) {

    err := conn.SendMsg(1, []byte("登录[FromClient]"))
    if err != nil {
        fmt.Println(err)
        return
    }
    time.Sleep(1 * time.Second)
    err = conn.SendMsg(2, []byte("退出[FromClient]"))
    if err != nil {
        fmt.Println(err)
        return
    }
    time.Sleep(1 * time.Second)
    err = conn.SendMsg(3, []byte("单发[FromClient]"))
    if err != nil {
        fmt.Println(err)
        return
    }
    time.Sleep(1 * time.Second)
    err = conn.SendMsg(4, []byte("群发[FromClient]"))
    if err != nil {
        fmt.Println(err)
        return
    }

}

// 创建连接的时候执行
func onClientStart(conn ziface.IConnection) {
    fmt.Println("onClientStart is Called ... ")
    go loginLoop(conn)
}

type LoginRouter struct {
    znet.BaseRouter
}

func (this *LoginRouter) Handle(request ziface.IRequest) {
    fmt.Println("登录成功")
}

type LogoutRouter struct {
    znet.BaseRouter
}

func (this *LogoutRouter) Handle(request ziface.IRequest) {
    fmt.Println("退出成功")
}

type SendRouter struct {
    znet.BaseRouter
}

func (this *SendRouter) Handle(request ziface.IRequest) {
    fmt.Println("单发成功")
}

type FullSendRouter struct {
    znet.BaseRouter
}

func (this *FullSendRouter) Handle(request ziface.IRequest) {
    fmt.Println("全发成功")
}
func main() {
    //创建Client客户端
    client := znet.NewClient("127.0.0.1", 10000)
    //设置链接建立成功后的钩子函数
    client.SetOnConnStart(onClientStart)
    client.AddRouter(1, &LoginRouter{})
    client.AddRouter(2, &LogoutRouter{})
    client.AddRouter(3, &SendRouter{})
    client.AddRouter(4, &FullSendRouter{})
    //启动客户端
    client.Start()

    //防止进程退出,等待中断信号
    select {}
}

启动业务测试

服务器单进程启动三个业务 gate-1 im-1 im-2

-> multiProcess git:(main) ✗ go run main.go

output:

              ██                        
              ▀▀                        
 ████████   ████     ██▄████▄  ▀██  ██▀ 
     ▄█▀      ██     ██▀   ██    ████   
   ▄█▀        ██     ██    ██    ▄██▄   
 ▄██▄▄▄▄▄  ▄▄▄██▄▄▄  ██    ██   ▄█▀▀█▄  
 ▀▀▀▀▀▀▀▀  ▀▀▀▀▀▀▀▀  ▀▀    ▀▀  ▀▀▀  ▀▀▀ 

┌──────────────────────────────────────────────────────┐
│ [Github] https://github.com/aceld                    │
│ [tutorial] https://www.yuque.com/aceld/npyr8s/bgftov │
│ [document] https://www.yuque.com/aceld/tsgooa        │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 20000
WsPort: 20001
Name: im-1
KcpPort: 0
Version: 
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode: 
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode: 
RouterSlicesMode: true
LogDir: 
LogFile: 
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile: 
PrivateKeyFile: 
==============================

              ██                        
              ▀▀                        
 ████████   ████     ██▄████▄  ▀██  ██▀ 
     ▄█▀      ██     ██▀   ██    ████   
   ▄█▀        ██     ██    ██    ▄██▄   
 ▄██▄▄▄▄▄  ▄▄▄██▄▄▄  ██    ██   ▄█▀▀█▄  
 ▀▀▀▀▀▀▀▀  ▀▀▀▀▀▀▀▀  ▀▀    ▀▀  ▀▀▀  ▀▀▀ 

┌──────────────────────────────────────────────────────┐
│ [Github] https://github.com/aceld                    │
│ [tutorial] https://www.yuque.com/aceld/npyr8s/bgftov │
│ [document] https://www.yuque.com/aceld/tsgooa        │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 21000
WsPort: 21001
Name: im-2
KcpPort: 0
Version: 
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode: 
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode: 
RouterSlicesMode: true
LogDir: 
LogFile: 
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile: 
PrivateKeyFile: 
==============================

              ██                        
              ▀▀                        
 ████████   ████     ██▄████▄  ▀██  ██▀ 
     ▄█▀      ██     ██▀   ██    ████   
   ▄█▀        ██     ██    ██    ▄██▄   
 ▄██▄▄▄▄▄  ▄▄▄██▄▄▄  ██    ██   ▄█▀▀█▄  
 ▀▀▀▀▀▀▀▀  ▀▀▀▀▀▀▀▀  ▀▀    ▀▀  ▀▀▀  ▀▀▀ 

┌──────────────────────────────────────────────────────┐
│ [Github] https://github.com/aceld                    │
│ [tutorial] https://www.yuque.com/aceld/npyr8s/bgftov │
│ [document] https://www.yuque.com/aceld/tsgooa        │
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 10000
WsPort: 10001
Name: gate-1
KcpPort: 0
Version: 
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode: 
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode: 
RouterSlicesMode: true
LogDir: 
LogFile: 
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile: 
PrivateKeyFile: 
==============================
开启前端拦截器

客户端运行

go run client/main.go

output:

2024/04/01 23:27:10 [INFO]connection.go:213: [Reader Goroutine is running]
2024/04/01 23:27:10 [DEBUG]connection.go:239: read buffer 00000001000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001010304010ce799bbe5bd95e68890e58a9f00 
登录成功
2024/04/01 23:27:11 [DEBUG]connection.go:239: read buffer 00000002000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001010304010ce98080e587bae68890e58a9f00 
退出成功
2024/04/01 23:27:13 [DEBUG]connection.go:239: read buffer 000000040000000b46756c6c53656e642e2e2e 
全发成功

在运行一个client

onClientStart is Called ... 
2024/04/01 23:27:58 [INFO]connection.go:213: [Reader Goroutine is running]
2024/04/01 23:27:58 [DEBUG]connection.go:239: read buffer 00000001000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001020304010ce799bbe5bd95e68890e58a9f00 
登录成功
2024/04/01 23:27:59 [DEBUG]connection.go:239: read buffer 00000002000000654e7f0301010b4e6f64654d65737361676501ff800001050106436f6e6e494401060001064e6f6465494401040001094e6f646547726f7570010c00010454797065010400010444617461010a00000015ff8001020304010ce98080e587bae68890e58a9f00 
退出成功
2024/04/01 23:28:00 [DEBUG]connection.go:239: read buffer 000000030000000753656e642e2e2e 
单发成功
2024/04/01 23:28:01 [DEBUG]connection.go:239: read buffer 000000040000000b46756c6c53656e642e2e2e 
全发成功

服务器多进程启动三个业务 gate-1 im-1 im-2

gate-1

multiProcess git:(main) ✗ go run main.go -id=1

output:

          ▀▀                        

████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀

┌──────────────────────────────────────────────────────┐
│ [Github] github.com/aceld
│ [tutorial] www.yuque.com/aceld/npyr8s/bgftov
│ [document] www.yuque.com/aceld/tsgooa
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 10000
WsPort: 10001
Name: gate-1
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:

PrivateKeyFile:

开启前端拦截器

im-1

multiProcess git:(main) ✗ go run main.go -id=2

output:

           ▀▀                        

████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀

┌──────────────────────────────────────────────────────┐
│ [Github] github.com/aceld
│ [tutorial] www.yuque.com/aceld/npyr8s/bgftov
│ [document] www.yuque.com/aceld/tsgooa
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 20000
WsPort: 20001
Name: im-1
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:

PrivateKeyFile:

im-2

multiProcess git:(main) ✗ go run main.go -id=3

output:

          ██                        
          ▀▀                        

████████ ████ ██▄████▄ ▀██ ██▀
▄█▀ ██ ██▀ ██ ████
▄█▀ ██ ██ ██ ▄██▄
▄██▄▄▄▄▄ ▄▄▄██▄▄▄ ██ ██ ▄█▀▀█▄
▀▀▀▀▀▀▀▀ ▀▀▀▀▀▀▀▀ ▀▀ ▀▀ ▀▀▀ ▀▀▀

┌──────────────────────────────────────────────────────┐
│ [Github] github.com/aceld
│ [tutorial] www.yuque.com/aceld/npyr8s/bgftov
│ [document] www.yuque.com/aceld/tsgooa
└──────────────────────────────────────────────────────┘
[Zinx] Version: V1.0, MaxConn: 12000, MaxPacketSize: 4096
===== Zinx Global Config =====
Host: 0.0.0.0
TCPPort: 21000
WsPort: 21001
Name: im-2
KcpPort: 0
Version:
MaxPacketSize: 0
MaxConn: 0
WorkerPoolSize: 0
MaxWorkerTaskLen: 0
WorkerMode:
MaxMsgChanLen: 0
IOReadBuffSize: 0
Mode:
RouterSlicesMode: true
LogDir:
LogFile:
LogSaveDays: 0
LogFileSize: 0
LogCons: false
LogIsolationLevel: 2
HeartbeatMax: 0
CertFile:

PrivateKeyFile:

```

这样就是可以多进程,或者多服务器进行部署了。

总结

现在zinx-cluster可以做到 单进程启动多服务多进程多服务器启动

使用注意

  • rpc需要使用者自己去选择,zinx-cluster不准备集成rpc
  • 多前端和多进程的项目,用户连接需要使用外部数据库来完成
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!