信息总线
简介
面向业务框架应该提供开箱即用的队列服务。
广播信息
如果你的应用运行在K8S环境中,那么通常应用会有多个副本。例如debug开关,站点维护状态,这种数据每个接口回会用上,放到redis都嫌慢的缓存,可以加在应用变量内存中,那么如何保证多个副本的应用内容缓存一致呢。这时候就应该需要一个广播机制;发一个信息,所有人都能收到。
文件 app/queues/kernel.go
这行内容保证没有被注释掉, 那么就启动了广播服务。
func (k *Kernel) Init() {
k.Listen(jobs.GetAllProvider())
// 打开广播进程, 默认订阅的是 home_broadcast,
k.StartBroadcast()
}
广播一个信息, 使用默认通道
func bus() {
servers.NewQueue().Publish(message.DemoMessage{
Msg: "每个副本都执行",
Count: 1,
})
}
接收信息,只要一个结构实现了函数 Handler(), 并且把message嵌套进去,那么它就可以自动接收信息,并且执行消费它。
// DemoJob @Bean
// job接收值会摧毁构建新的,所以这里不能使用注入系统,要使用New
type DemoJob struct {
message.DemoMessage
}
func (d *DemoJob) Handler() {
fmt.Println(d.Count)
d.Count++
}
信息队列
复杂应用基本都会有一个信息队列
,作为系统的峰谷
业务消费、异步
执行服务。信息队列是一个常用服务,标配。现在投递一个信息jo
b, 只能执行一次,只能被一个消费者消费。和广播不一样,广播是所有启动的应用都能收到,队列信息job
经过Push
,只能被一个抢夺并且消费。框架是基于redis
, stream
实现的信息队列,是可靠的结构,只要你的Handler()
没有panic
,那么就会自动ack
。
func bus() {
servers.NewQueue().Push(message.DemoMessage{
Msg: "执行一次",
Count: 1,
})
}
消费job和广播一样,只要一个结构实现了函数 Handler(), 并且把message嵌套进去。
延时队列
延时队列用在往后指定事件执行的job的系统,例如商场下单后,系统可以在30分钟检查是否已经支付,如果没有就关闭订单不再等待等等。
func bus() {
servers.NewQueue().Delay(30 * 3600 * time.Second).Push(message.DemoMessage{
Msg: "30分钟后执行",
Count: 1,
})
}
延时队列是信息队列一个扩展,默认不开启。当前系统默认开启的基于mysql的驱动。当然是插拔设计的,可以提前注册另外的驱动。只要实现接口
// 驱动接口
type DelayQueue interface {
Push(DelayTask) string
Del(string) bool
Run()
}
// *****
// 开启延时队列服务
func (k *Kernel) Init() {
k.Listen(jobs.GetAllProvider())
// 打开延时队列
k.StartDelayQueue()
}
信息设置
package message
type Demo struct {}
设置投递stream名称
stream分组用在管理工具上更容易查看数据
func (receiver Demo) SetQueue() string {
return "demo_queue"
}
设置投递分组名称
分组控制消费者进程,每个分组名称都会独立一个消费协程,如果有更加重要的job, 需要更快消费,可以设置独立的group
func (receiver Demo) SetGroup() string {
return "demo_group"
}
设置job路由
组件默认自动计算message路由路径(PkgPath + Name), 如果需要更短或者自定义
func (receiver Demo) SetRoute() string {
return "demo_route"
}
job需要串行执行
如果你的任务不可以并行消费
func (receiver Demo) SetSerial() bool {
return true
}