信息总线

未匹配的标注

简介

面向业务框架应该提供开箱即用的队列服务。

广播信息

如果你的应用运行在K8S环境中,那么通常应用会有多个副本。例如debug开关,站点维护状态,这种数据每个接口回会用上,放到redis都嫌慢的缓存,可以加在应用变量内存中,那么如何保证多个副本的应用内容缓存一致呢。这时候就应该需要一个广播机制;发一个信息,所有人都能收到。
文件 app/queues/kernel.go 这行内容保证没有被注释掉, 那么就启动了广播服务。

func (k *Kernel) Init() {
    k.Listen(jobs.GetAllProvider())
    // 打开广播进程, 默认订阅的是 home_broadcast,
    k.StartBroadcast()
}

广播一个信息, 使用默认通道

func bus() {
   servers.NewQueue().Publish(message.DemoMessage{
      Msg:   "每个副本都执行",
      Count: 1,
   })
}

接收信息,只要一个结构实现了函数 Handler(), 并且把message嵌套进去,那么它就可以自动接收信息,并且执行消费它。

// DemoJob @Bean
// job接收值会摧毁构建新的,所以这里不能使用注入系统,要使用New
type DemoJob struct {
    message.DemoMessage
}

func (d *DemoJob) Handler() {
    fmt.Println(d.Count)
    d.Count++
}

信息队列

复杂应用基本都会有一个信息队列,作为系统的峰谷业务消费、异步执行服务。信息队列是一个常用服务,标配。现在投递一个信息job, 只能执行一次,只能被一个消费者消费。和广播不一样,广播是所有启动的应用都能收到,队列信息job经过Push,只能被一个抢夺并且消费。框架是基于redis, stream实现的信息队列,是可靠的结构,只要你的Handler()没有panic,那么就会自动ack

func bus() {
   servers.NewQueue().Push(message.DemoMessage{
      Msg:   "执行一次",
      Count: 1,
   })
}

消费job和广播一样,只要一个结构实现了函数 Handler(), 并且把message嵌套进去。

延时队列

延时队列用在往后指定事件执行的job的系统,例如商场下单后,系统可以在30分钟检查是否已经支付,如果没有就关闭订单不再等待等等。

func bus() {
servers.NewQueue().Delay(30 * 3600 * time.Second).Push(message.DemoMessage{
      Msg:   "30分钟后执行",
      Count: 1,
   })
}

延时队列是信息队列一个扩展,默认不开启。当前系统默认开启的基于mysql的驱动。当然是插拔设计的,可以提前注册另外的驱动。只要实现接口

// 驱动接口
type DelayQueue interface {
   Push(DelayTask) string
  Del(string) bool
  Run()
}
// *****

// 开启延时队列服务
func (k *Kernel) Init() {
    k.Listen(jobs.GetAllProvider())
    // 打开延时队列
    k.StartDelayQueue()
}

信息设置

package message
type Demo struct {}
设置投递stream名称

stream分组用在管理工具上更容易查看数据

func (receiver Demo) SetQueue() string {
    return "demo_queue"
}
设置投递分组名称

分组控制消费者进程,每个分组名称都会独立一个消费协程,如果有更加重要的job, 需要更快消费,可以设置独立的group

func (receiver Demo) SetGroup() string {
    return "demo_group"
}
设置job路由

组件默认自动计算message路由路径(PkgPath + Name), 如果需要更短或者自定义

func (receiver Demo) SetRoute() string {
    return "demo_route"
}
job需要串行执行

如果你的任务不可以并行消费

func (receiver Demo) SetSerial() bool {
    return true
}

项目地址 通用工具

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 查看所有版本


暂无话题~