Go 优惠券的 Redis Nsq
Nsq 这里是用docker: www.nsqio.cn/docker.html
nsq 简单了解
nsqd:一个负责接收、排队、转发消息到客户端的守护进程
nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
docker-compose.yml
version: '3'
services:
nsqlookupd:
hostname: nsqlookupd
image: nsqio/nsq
command: /nsqlookupd --http-address 0.0.0.0:4161 --tcp-address 0.0.0.0:4160 -broadcast-address nsqlookupd
ports:
- "4160:4160"
- "4161:4161"
nsqd1:
hostname: nsqd
image: nsqio/nsq
command: /nsqd --mem-queue-size=0 --data-path /usr/local/nsq/bin/data --tcp-address 0.0.0.0:4150 --http-address 0.0.0.0:4151 --broadcast-address=nsqd1 --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
volumes:
- "/data/nsq1:/usr/local/nsq/bin/data"
nsqd2:
hostname: nsqd2
image: nsqio/nsq
command: /nsqd --mem-queue-size=0 --data-path /usr/local/nsq/bin/data --tcp-address 0.0.0.0:4152 --http-address 0.0.0.0:4153 --broadcast-address=nsqd2 --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4152:4152"
- "4153:4153"
volumes:
- "/data/nsq2:/usr/local/nsq/bin/data"
nsqlookupd22:
hostname: nsqlookupd22
image: nsqio/nsq
command: /nsqlookupd --http-address 0.0.0.0:4261 --tcp-address 0.0.0.0:4260 --broadcast-address nsqlookupd22
ports:
- "4261:4261"
- "4260:4260"
nsqd21:
hostname: nsqd21
image: nsqio/nsq
command: /nsqd --tcp-address 0.0.0.0:4250 --http-address 0.0.0.0:4251 --broadcast-address=nsqd21 --lookupd-tcp-address=nsqlookupd22:4260
depends_on:
- nsqlookupd
ports:
- "4250:4250"
- "4251:4251"
nsqd22:
hostname: nsqd22
image: nsqio/nsq
command: /nsqd --tcp-address 0.0.0.0:4252 --http-address 0.0.0.0:4253 --broadcast-address=nsqd22 --lookupd-tcp-address=nsqlookupd22:4260
depends_on:
- nsqlookupd
ports:
- "4252:4252"
- "4253:4253"
nsqadmin:
hostname: nsqadmin
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address nsqlookupd:4161 --lookupd-http-address nsqlookupd22:4261
depends_on:
- nsqlookupd
ports:
- "4171:4171"
nsq_to_file:
hostname: nsq_to_file
image: nsqio/nsq
command: /nsq_to_file --output-dir /usr/local/nsq/bin/data --topic Topic_Demo --channel=archive --lookupd-http-address nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4181:4181"
- "4182:4182"
volumes:
- "/data/nsq_to_file:/usr/local/nsq/bin/data"
启动集群
- 启动
docker-compose up -d
wedAdmin管理界面: 127.0.0.1:4171
nsq go客户端
go get -u github.com/nsqio/go-nsq
Reids集群:
安装reids:redis.io/docs/getting-started/inst...
version: '3.1'
services:
redis1:
image: redis:7.0.5
container_name: redis-1
restart: always
network_mode: "host"
ports:
- "6380:6380"
environment:
- REDISCLI_AUTH=Tm@587973
volumes:
- /data/redis-cluster/redis-1/data:/data
- /data/redis-cluster/redis-1/redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
redis2:
image: redis:7.0.5
container_name: redis-2
restart: always
network_mode: "host"
ports:
- "6381:6381"
environment:
- REDISCLI_AUTH=Tm@587973
volumes:
- /data/redis-cluster/redis-2/data:/data
- /data/redis-cluster/redis-2/redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
redis3:
image: redis:7.0.5
container_name: redis-3
restart: always
network_mode: "host"
ports:
- "6382:6382"
environment:
- REDISCLI_AUTH=Tm@587973
volumes:
- /data/redis-cluster/redis-3/data:/data
- /data/redis-cluster/redis-3/redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
redis4:
image: redis:7.0.5
container_name: redis-4
restart: always
network_mode: "host"
ports:
- "6383:6383"
environment:
- REDISCLI_AUTH=Tm@587973
volumes:
- /data/redis-cluster/redis-4/data:/data
- /data/redis-cluster/redis-4/redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
redis5:
image: redis:7.0.5
container_name: redis-5
restart: always
network_mode: "host"
ports:
- "6384:6384"
environment:
- REDISCLI_AUTH=Tm@587973
volumes:
- /data/redis-cluster/redis-5/data:/data
- /data/redis-cluster/redis-5/redis.conf:/usr/local/etc/redis/redis.conf
command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
Go代码
redis
golang.org/x/sync/singleflight
singleflight 可以防缓存击穿
type Cache struct {
requestGroup singleflight.Group
}
func (c *Cache) GetCouponQuantity(CouponId int) (quantity int, err error) {
CouponIdStr := strconv.Itoa(CouponId)
quantityStr, err := rediser.ClusterClient.Get(context.Background(), "coupon:"+CouponIdStr).Result()
if err != redis.Nil && err != nil {
return 0, err
}
if err == nil {
quantity, err = strconv.Atoi(quantityStr)
return
}
return c.SetCouponQuantity(CouponIdStr)
}
// 缓存优惠券数量
func (c *Cache) SetCouponQuantity(CouponIdStr string) (quantity int, err error) {
quantityAny, err, _ := c.requestGroup.Do(CouponIdStr, func() (interface{}, error) {
rediser.ClusterClient.Del(context.TODO(), "coupon:"+CouponIdStr)
var coupon models.Coupon
err := core.New().Db.Table("coupon").Where("coupon_id = ?", CouponIdStr).Scan(&coupon).Error
if err != nil {
return 0, err
}
cmd := rediser.ClusterClient.SetNX(context.Background(), "coupon:"+CouponIdStr, coupon.Quantity, time.Hour*1)
if cmd.Err() != nil {
return 0, cmd.Err()
}
return coupon.Quantity, nil
})
quantity = quantityAny.(int)
return
}
// 优惠券原子减一
func (c *Cache) CouponQuantityDecr(couponId int) error {
CouponIdStr := strconv.Itoa(couponId)
q, err := rediser.ClusterClient.Decr(context.Background(), "coupon:"+CouponIdStr).Result()
if err != nil {
return err
}
if q < 0 {
rediser.ClusterClient.Incr(context.Background(), "coupon:"+CouponIdStr)
return errors.New("数量不足")
}
return nil
}
NSQ
生产者
var Producer *nsq.Producer
type NewNsqProducer struct {
Nsqd_host string
Nsqd_port int
}
func (NP *NewNsqProducer) InitProducer() (*nsq.Producer, error) {
config := nsq.NewConfig()
nsqAddr := fmt.Sprintf("%s:%d", NP.Nsqd_host, NP.Nsqd_port)
var err error
Producer, err = nsq.NewProducer(nsqAddr, config)
if err != nil {
fmt.Printf("create producer failed,err:%v \n", err)
return nil, err
}
return Producer, nil
}
消费者
type NsqConsumer struct {
Title string
}
func (NC *NsqConsumer) InitConsumer(topic string, channel string, address string) (consumer *nsq.Consumer, err error) {
// 1.NewConfig返回一个新的默认nsq配置
config := nsq.NewConfig()
// 2.查找轮询间隔此处设置15s
config.LookupdPollInterval = 20 * time.Second
// 3.NewConsumer为指定的主题/频道创建新的Consumer实例
consumer, err = nsq.NewConsumer(topic, channel, config)
if err != nil {
fmt.Printf("create consumer failed, err:%v\n", err)
return
}
consumer.AddHandler(&coupon.NsqConsumer{Title: NC.Title})
// 5.两种方式连接到NSQD
// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD,tcp长连接
if err := consumer.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询连NSQD,更易于分布式容错和高可用
return consumer, err
}
return consumer, nil
}
本作品采用《CC 协议》,转载必须注明作者和本文链接