使用 Go 和 ReactJS 构建聊天系统(四):处理多个客户端

是时候实现处理多个客户端并向每个连接的客户端广播任何收到的消息的功能了。本节结束后,我们将:

  • 实现 Pool 机制,该机制将有效地使我们能够跟踪到 WebSocket 服务器中的连接数。
  • 我们还可以将任何收到的消息广播到连接池中的所有连接。
  • 当另一个客户端连接或断开连接时,我们还可以通知现有所有的客户端。

在本节完成后,我们的应用程序将像下面一样:

Chat Application Screenshot

拆分我们的 Websocket 代码

既然我们已经完成了必要的内部清理工作,那么我们可以继续改进代码库。我们将把一些应用程序分成多个子包,以便于开发。

现在,理想情况下,您的 main.go 文件应该只是 Go 应用程序的入口点,应该相当小,并可以调用项目中的其他软件包。

注意: 我们将基于非官方的 Go 项目标准来进行项目布局 —— golang-standards/project-layout

让我们在 backend 目录中创建一个名为 pkg / 的新目录。在其中,我们将要创建另一个名为 websocket / 的目录,其中将包含一个 websocket.go 文件。

我们将把目前在 main.go 文件中拥有的许多 WebSocket 特定代码移至新的 websocket.go 文件中。

注意: 不过要注意的一件事是,当我们复制函数时,我们需要将要在项目的其余部分中显示的每个函数的首字母大写。

package websocket

import (
    "fmt"
    "io"
    "log"
    "net/http"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool { return true },
}

func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
    ws, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return ws, err
    }
    return ws, nil
}

func Reader(conn *websocket.Conn) {
    for {
        messageType, p, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }

        fmt.Println(string(p))

        if err := conn.WriteMessage(messageType, p); err != nil {
            log.Println(err)
            return
        }
    }
}

func Writer(conn *websocket.Conn) {
    for {
        fmt.Println("Sending")
        messageType, r, err := conn.NextReader()
        if err != nil {
            fmt.Println(err)
            return
        }
        w, err := conn.NextWriter(messageType)
        if err != nil {
            fmt.Println(err)
            return
        }
        if _, err := io.Copy(w, r); err != nil {
            fmt.Println(err)
            return
        }
        if err := w.Close(); err != nil {
            fmt.Println(err)
            return
        }
    }
}

现在我们已经创建了这个新的 websocket 包,然后我们想更新我们的 main.go 文件以调出这个包。我们首先必须在文件顶部的导入列表中添加一个新导入,然后我们可以通过在调用之前添加 websocket. 来调用该包中的函数:

package main

import (
    "fmt"
    "net/http"

    "github.com/TutorialEdge/realtime-chat-go-react/pkg/websocket"
)

func serveWs(w http.ResponseWriter, r *http.Request) {
    ws, err := websocket.Upgrade(w, r)
    if err != nil {
        fmt.Fprintf(w, "%+V\n", err)
    }
    go websocket.Writer(ws)
    websocket.Reader(ws)
}

func setupRoutes() {
    http.HandleFunc("/ws", serveWs)
}

func main() {
    fmt.Println("Distributed Chat App v0.01")
    setupRoutes()
    http.ListenAndServe(":8080", nil)
}

进行了这些新的更改后,我们应该检查一下自己所做的工作并未破坏我们现有的功能。尝试再次运行后端和前端,并确保仍然可以发送和接收消息:

$ go run main.go

如果成功,我们可以继续扩展代码库以处理多个客户端。

至此,您的目录结构应如下所示:

- backend/
- - pkg/
- - - websocket/
- - - - websocket.go
- - main.go
- - go.mod
- - go.sum
- frontend/
- ...

处理多个客户端

太好了,现在我们已经完成了基本的整理工作,可以继续改进后端并实施一种机制来处理多个客户。

为此,我们需要考虑如何处理与 WebSocket 服务器的连接。 每当建立新连接时,我们都必须将它们添加到现有连接池中,并确保每次发送消息时,该池中的每个人都会收到该消息。

使用通道

我们将在具有大量并发连接的系统上进行工作。对于每个并发连接,都会在连接期间启动一个新的 goroutine。这意味着我们必须担心这些并发的 goroutines 之间的通信,并确保我们所做的一切都是线程安全的。

这意味着,当我们进一步实现 Pool 结构时,我们必须考虑使用 sync.Mutexgoroutines 互斥以同时访问/修改我们的数据,或者我们可以使用 channels

对于这个项目,我认为我们最好使用 channels 并使用它们在多个并发的 goroutines 之间以安全的方式进行通信。

注意: 如果您想了解有关 Go 频道的更多信息,可以在这里查看我的其他文章: Go 频道指南

Client.go

首先创建一个名为 Client.go 的新文件,该文件将位于我们的 pkg / websocket 目录中,并在其中定义一个 Client 结构,其中包含以下内容:

  • ID —— 特定连接的唯一可识别字符串
  • Conn —— 指向 websocket.Conn 对象的指针
  • Pool —— 指向该客户端将参与其中的 Pool 的指针

我们还将定义一个 Read() 方法,该方法将不断侦听此 Client 的 WebSocket 连接上通过的新消息。

如果有任何消息,它将把这些消息传递到 Pool 的 Broadcast 频道,该通道随后将接收到的消息广播到池中的每个客户端。

package websocket

import (
    "fmt"
    "log"
    "sync"

    "github.com/gorilla/websocket"
)

type Client struct {
    ID   string
    Conn *websocket.Conn
    Pool *Pool
}

type Message struct {
    Type int    `json:"type"`
    Body string `json:"body"`
}

func (c *Client) Read() {
    defer func() {
        c.Pool.Unregister <- c
        c.Conn.Close()
    }()

    for {
        messageType, p, err := c.Conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }
        message := Message{Type: messageType, Body: string(p)}
        c.Pool.Broadcast <- message
        fmt.Printf("Message Received: %+v\n", message)
    }
}

太好了,既然我们已经在代码中定义了客户,那么我们就可以继续实施我们的 Pool 了。

Pool 结构

我们将在与 client.go 文件和 websocket.go 文件同一目录下创建 pool.go 文件。

让我们从定义一个 Pool 结构开始,该结构将包含并发通信所需的所有 channels 以及客户端的 map

package websocket

import "fmt"

type Pool struct {
    Register   chan *Client
    Unregister chan *Client
    Clients    map[*Client]bool
    Broadcast  chan Message
}

func NewPool() *Pool {
    return &Pool{
        Register:   make(chan *Client),
        Unregister: make(chan *Client),
        Clients:    make(map[*Client]bool),
        Broadcast:  make(chan Message),
    }
}

我们需要确保应用程序中只有一个点能够写入 WebSocket 连接,否则我们将面临并发写入问题。因此,让我们定义 Start() 方法,该方法将不断监听传递给我们 Pool 中任何一个通道的内容,然后,如果其中任何一个通道收到了任何内容,它将采取相应的措施。

  • 注册 —— 当有新客户端连接时,我们的注册频道将向此 Pool 中的所有客户端发送 New User Joined...
  • 取消注册 —— 将取消注册用户,并在客户端断开连接时通知 Pool。
  • 客户端 —— 客户到布尔值的映射。 我们可以使用布尔值来指示活动/非活动状态,但不能根据浏览器的焦点进一步断开连接。
  • 广播 —— 一个通道,该通道在传递消息时将循环通过 Pool 中的所有客户端,并通过套接字连接发送消息。

现在让我们编写代码:

func (pool *Pool) Start() {
    for {
        select {
        case client := <-pool.Register:
            pool.Clients[client] = true
            fmt.Println("Size of Connection Pool: ", len(pool.Clients))
            for client, _ := range pool.Clients {
                fmt.Println(client)
                client.Conn.WriteJSON(Message{Type: 1, Body: "New User Joined..."})
            }
            break
        case client := <-pool.Unregister:
            delete(pool.Clients, client)
            fmt.Println("Size of Connection Pool: ", len(pool.Clients))
            for client, _ := range pool.Clients {
                client.Conn.WriteJSON(Message{Type: 1, Body: "User Disconnected..."})
            }
            break
        case message := <-pool.Broadcast:
            fmt.Println("Sending message to all clients in Pool")
            for client, _ := range pool.Clients {
                if err := client.Conn.WriteJSON(message); err != nil {
                    fmt.Println(err)
                    return
                }
            }
        }
    }
}

Websocket.go

太棒了,让我们对 websocket.go 文件进行一些更小的更改,并删除一些不再需要的功能和方法:

package websocket

import (
    "log"
    "net/http"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
    CheckOrigin: func(r *http.Request) bool { return true },
}

func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return nil, err
    }

    return conn, nil
}

更新我们的 main.go 文件:

最后,我们需要更新我们的 main.go 文件以在每个连接上创建一个新的 Client 并将该客户端注册到 Pool 中:

package main

import (
    "fmt"
    "net/http"

    "github.com/TutorialEdge/realtime-chat-go-react/pkg/websocket"
)

func serveWs(pool *websocket.Pool, w http.ResponseWriter, r *http.Request) {
    fmt.Println("WebSocket Endpoint Hit")
    conn, err := websocket.Upgrade(w, r)
    if err != nil {
        fmt.Fprintf(w, "%+v\n", err)
    }

    client := &websocket.Client{
        Conn: conn,
        Pool: pool,
    }

    pool.Register <- client
    client.Read()
}

func setupRoutes() {
    pool := websocket.NewPool()
    go pool.Start()

    http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
        serveWs(pool, w, r)
    })
}

func main() {
    fmt.Println("Distributed Chat App v0.01")
    setupRoutes()
    http.ListenAndServe(":8080", nil)
}

检查工作

现在,我们已经进行了所有必要的更改,我们应该在一个合适的位置来测试我们所做的事情,并确保一切都按预期进行。

启动您的后端应用程序:

$ go run main.go
Distributed Chat App v0.01

如果您在几个浏览器标签中打开 http://localhost:3000/,您应该注意到这些标签会自动连接到我们的后端 WebSocket 服务器,我们现在可以发送和接收来自同一 Pool 中连接的其他客户端的消息!

Chat Application Screenshot

总结

因此,在本系列的这一部分中,我们设法实现了一种处理多个客户端并向连接 Pool 中连接的每个人广播消息的方法。

现在事情开始变得更加有趣了。我们可以在下节中开始添加一些很酷的新功能,例如自定义消息。

本文译自tutorialedge

本作品采用《CC 协议》,转载必须注明作者和本文链接
最初的时候也是最苦的时候,最苦的时候也是最酷的时候。
本帖由系统于 3年前 自动加精
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!