手把手教你用 Go 写一个基于 Redis 的消息队列

消息队列,我们一般称它为 MQ(Message Queue),两个单词的结合,这两个英文单词想必大家都应该知道吧,其实最熟悉的还是 Queue 吧,即队列。队列是一种先进先出的数据结构,队列的使用还是比较普遍的。在现代软件开发中,消息队列是实现应用解耦、提高系统扩展性和健壮性的关键组件。Go 语言以其并发处理能力著称,是构建高性能消息队列的理想选择。本文将介绍一个使用 Go 语言和 Redis 实现的简单但功能完备的消息队列系统。

概述

消息队列系统通常由生产者(Producer)和消费者(Consumer)组成。生产者负责发送消息,而消费者则负责接收和处理这些消息。我们的消息队列系统使用 Redis 作为后端存储,利用其高性能和持久化特性来保证消息的可靠性。

程序结构

我们的系统由以下几个核心组件构成:

  • Queue:队列管理器,负责创建队列、发布消息和启动消费者。
  • Producer:消息生产者,用于向队列发送消息。
  • Consumer:消息消费者,用于从队列接收并处理消息。
  • Message:消息实体,定义了消息的结构和序列化方法。

核心代码解析

Queue 管理器

queue.go 文件定义了 Queue 结构和相关方法,它封装了一个用于消息队列处理的 redis.Client 实例,。Queue 结构也包含了 Redis 客户端、主题(topic)以及生产者和消费者实例。其中 NewQueue 函数用于创建一个新的消息队列实例,包括生产者和消费者。Start 方法用于启动消费者的监听,Publish 方法用于发布消息。完整代码如下:


package queue

import (
    "context"
    "github.com/go-redis/redis/v8"
    "sync"
)

const (
    HashSuffix = ":hash" // Redis 键后缀,用于哈希表
    SetSuffix = ":set" // Redis 键后缀,用于集合
)

var once sync.Once // 用于确保某些初始化代码只执行一次的全局变量

type Queue struct {
    ctx context.Context // 上下文

    // redis
    redis *redis.Client  // Redis客户端
    topic string         // 主题

    // producer and consumer
    producer *producer   // 生产者
    consumer *consumer   // 消费者

}

func NewQueue(ctx context.Context, redis *redis.Client, opts ...Option) *Queue {
    var queue *Queue

    // 使用sync.Once确保以下代码只执行一次
    once.Do(func() {
        // 定义默认的选项
        defaultOptions := Options{
            topic:   "topic",
            handler: defaultHander,  // 默认处理函数
        }

        // 应用传入的选项(opts)
        for _, apply := range opts {
            apply(&defaultOptions)
        }

        // 创建Queue实例
        queue = &Queue{
            ctx:      ctx,
            redis:    redis,
            topic:    defaultOptions.topic,
            producer: NewProducer(ctx),  // 创建生产者
            consumer: NewConsumer(ctx, defaultOptions.handler),  // 创建消费者,使用处理函数
        }
    })

    // 返回Queue实例
    return queue

}

func (q *Queue) Start() {
    // 启动消费者的监听
    go q.consumer.listen(q.redis, q.topic)
}

func (q *Queue) Publish(msg *Message) (int64, error) {
    // 发布消息
    return q.producer.publish(q.redis, q.topic, msg)
}
  • 你也发现了其中 NewQueue 函数接受一个上下文、Redis 客户端和一个可变数量的选项,用于创建一个新的队列实例。Option 定义了一个使用函数闭包实现的选项模式,用于生成具有不同选项的 Options 结构,其中包含了可配置的字段 topic 和 handler。WithTopic 和 WithHandler 是两个函数,它们接受相应的参数并返回对应的选项函数。当这些选项函数被应用到 Options 结构时,会设置结构体中相应的字段值。queue_option.go 代码如下:
package queue

type Option func(*Options)

type Options struct {
    topic   string
    handler handlerFunc
}

// WithTopic 用于设置选项中的 topic 字段
func WithTopic(topic string) Option {
    return func(opts *Options) {
        opts.topic = topic
    }
}

// WithHandler 用于设置选项中的 handler 字段
func WithHandler(handler handlerFunc) Option {
    return func(opts *Options) {
        opts.handler = handler
    }
}

消息实体

message.go 文件定义了 Message 结构,用于表示消息实体。它包含了消息的唯一标识符(ID)、创建时间、消费时间以及消息体(body)。NewMessage 函数用于创建消息实例。GetScore 函数返回消息的分数,GetId 函数返回消息的 ID,MarshalBinary 和 UnmarshalBinary 函数用于消息的序列化和反序列化。

package queue

import (
    "encoding/json"
    "github.com/satori/go.uuid"
    "time"
)

// Message 定义消息结构
type Message struct {
    Id          string      `json:"id"`
    CreateTime  time.Time   `json:"createTime"`
    ConsumeTime time.Time   `json:"consumeTime"`
    Body        interface{} `json:"body"`
}

// NewMessage 用于创建消息实体
func NewMessage(id string, consumeTime time.Time, body interface{}) *Message {
    if id == "" {
        id = uuid.NewV4().String()
    }
    return &Message{
        Id:          id,
        CreateTime:  time.Now(),
        ConsumeTime: consumeTime,
        Body:        body,
    }
}

// GetScore 用于返回消息的分数
func (m *Message) GetScore() float64 {
    return float64(m.ConsumeTime.Unix())
}

// GetId 用于返回消息的ID
func (m *Message) GetId() string {
    return m.Id
}

// MarshalBinary 用于将消息结构体序列化为二进制数据
func (m *Message) MarshalBinary() ([]byte, error) {
    return json.Marshal(m)
}

// UnmarshalBinary 用于将二进制数据反序列化为消息结构体
func (m *Message) UnmarshalBinary(data []byte) error {
    return json.Unmarshal(data, m)
}

生产者

producer.go 文件定义了 producer 结构和 publish 方法。生产者负责将消息发布到 Redis 的有序集合(sorted set)和哈希表(hash)中。代码创建了一个生产者类型并实现了发布消息到 Redis 的功能。在发布消息时,它首先将消息写入有序集合,然后将消息写入哈希表。这样,通过使用 NewProducer 函数创建生产者实例后,可以使用其 publish 方法将消息发布到 Redis 中保存。 producer.go 代码如下:

package queue

import (
    "context"
    "github.com/go-redis/redis/v8"
)

type producer struct {
    ctx context.Context
}

func NewProducer(ctx context.Context) *producer {
    return &producer{
        ctx: ctx,
    }
}

// Publish方法用于将消息发布到Redis中
func (p *producer) publish(redisClient *redis.Client, topic string, msg *Message) (int64, error) {
    z := &redis.Z{
        Score:  msg.GetScore(),
        Member: msg.GetId(),
    }

    // 将消息写入有序集合
    setKey := topic + SetSuffix
    n, err := redisClient.ZAdd(p.ctx, setKey, z).Result()
    if err != nil {
        return n, err
    }

    // 将消息写入哈希表
    hashKey := topic + HashSuffix
    return redisClient.HSet(p.ctx, hashKey, msg.GetId(), msg).Result()
}

消费者

consumer.go 文件定义了 consumer 结构和 listen 方法。消费者通过监听 Redis 的有序集合来获取消息,并从哈希表中检索消息详情,然后通过提供的处理器(handler)函数进行处理。建了一个消费者类型,并实现了监听消息队列中的消息并处理的功能。在 NewConsumer 方法中,创建了一个消费者实例并初始化了相关参数。在 listen 方法中,通过定时器定时从有序集合获取消息,并通过通道将数据送至处理函数进行最终逻辑处理。这样,通过使用 NewConsumer 函数创建消费者实例后,可以调用其 listen 方法实现对消息队列的监听和处理。consumer.go 代码如下:

package queue

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/go-redis/redis/v8"
    "log"
    "strconv"
    "time"
)

// handlerFunc 是用于处理消息的函数类型
type handlerFunc func(msg Message)

// 默认处理函数,打印消息
func defaultHandler(msg Message) {
    fmt.Println(msg)
}

// consumer 类型包含了消费者相关的字段和方法
type consumer struct {
    ctx      context.Context
    duration time.Duration
    ch       chan []string
    handler  handlerFunc
}

// NewConsumer 用于创建一个消费者实例
func NewConsumer(ctx context.Context, handler handlerFunc) *consumer {
    return &consumer{
        ctx:      ctx,
        duration: time.Second,
        ch:       make(chan []string, 1000),
        handler:  handler,
    }
}

// listen 方法用于监听消息队列中的消息并处理
func (c *consumer) listen(redisClient *redis.Client, topic string) {
    // 从哈希表中获取数据并处理
    go func() {
        for {
            select {
            case ret := <-c.ch:
                // 从哈希表中批量获取数据信息
                key := topic + HashSuffix
                result, err := redisClient.HMGet(c.ctx, key, ret...).Result()
                if err != nil {
                    log.Println(err)
                }

                if len(result) > 0 {
                    redisClient.HDel(c.ctx, key, ret...)
                }

                msg := Message{}
                for _, v := range result {
                    // 由于哈希表和有序集合操作不是原子操作,可能会出现删除了集合中的数据但哈希表中数据未删除的情况
                    if v == nil {
                        continue
                    }
                    str := v.(string)
                    json.Unmarshal([]byte(str), &msg)

                    // 处理逻辑
                    go c.handler(msg)
                }

            }
        }
    }()

    // 定时器用于定时获取消息并处理
    ticker := time.NewTicker(c.duration)
    defer ticker.Stop()
    for {
        select {
        case <-c.ctx.Done():  // 上下文取消,退出监听
            log.Println("consumer quit:", c.ctx.Err())
            return
        case <-ticker.C:  // 定时获取消息
            // 从 Redis 中读取数据
            min := strconv.Itoa(0)
            max := strconv.Itoa(int(time.Now().Unix()))
            opt := &redis.ZRangeBy{
                Min: min,
                Max: max,
            }

            key := topic + SetSuffix
            result, err := redisClient.ZRangeByScore(c.ctx, key, opt).Result()
            if err != nil {
                log.Fatal(err)
                return
            }

            // 获取到数据
            if len(result) > 0 {
                // 从有序集合中移除数据
                redisClient.ZRemRangeByScore(c.ctx, key, min, max)

                // 写入通道,进行哈希表处理
                c.ch <- result
            }
        }
    }
}

使用示例

假设我们有一个简单的场景,需要发送和处理用户行为日志。我们可以定义一个日志消息的生产者和消费者,main.go 如下所示:



var RedisQueue *queue.Queue
var Redis *redis.Client

func mian() {
    // 这里 redis 的链接自己写吧
    client := redis.NewClient(option)

    _, err := client.Ping(context.Background()).Result()

    if err != nil {
        log.Fatal(fmt.Sprintf("Connect to redis: %v", err))
    }

    Redis = client

    // 初始化 redis 消费队列
    InitRedisQueue()
}

// InitRedisQueue 创建延时队列
func InitRedisQueue() {
    // 创建队列
    RedisQueue = queue.NewQueue(context.Background(), Redis,
        queue.WithTopic("send-message"),
        queue.WithHandler(bot.CreateAndSendMessages))

    // 启动消费者
    RedisQueue.Start()
}

// 创建并发送消息
func CreateAndSendMessages() {
    id := uuid.NewV4().String()
    logMsg := queue.NewMessage(id, time.Now(), map[string]interface{}{"user_id": 123, "action": "login"})
    queue.Publish(logMsg)
    if err != nil {
        fmt.Println("send ", err)
    }
}

总结

本文介绍了一个基于 Go 语言和 Redis 实现的消息队列系统。该系统简单、高效,并且易于扩展。无论是在微服务架构中还是在需要解耦生产者和消费者的场景中,它都能提供强大的支持。请注意,这是一篇技术文章,您可以根据需要进行调整和补充。如果您有更多的细节或特定的要求,以便进一步用在生产环境中,请您完善更多的细节,比如:性能调优,特别是考虑到网络延迟和 Redis 实例的性能。

关注我获取更新

本作品采用《CC 协议》,转载必须注明作者和本文链接
微信搜索:上帝喜爱笨人
讨论数量: 3

有github地址嘛?

1个月前 评论
Aliliin (楼主) 1个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!