Go 优惠券的 Redis Nsq

Nsq 这里是用docker: www.nsqio.cn/docker.html

nsq 简单了解

nsqd:一个负责接收、排队、转发消息到客户端的守护进程
nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

docker-compose.yml

version: '3'
services:
  nsqlookupd:
    hostname: nsqlookupd
    image: nsqio/nsq
    command: /nsqlookupd --http-address 0.0.0.0:4161 --tcp-address 0.0.0.0:4160 -broadcast-address nsqlookupd
    ports:
      - "4160:4160"
      - "4161:4161"
  nsqd1:
    hostname: nsqd
    image: nsqio/nsq
    command: /nsqd --mem-queue-size=0 --data-path /usr/local/nsq/bin/data  --tcp-address 0.0.0.0:4150  --http-address 0.0.0.0:4151  --broadcast-address=nsqd1 --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4150:4150"
      - "4151:4151"
    volumes:
      - "/data/nsq1:/usr/local/nsq/bin/data"
  nsqd2:
    hostname: nsqd2
    image: nsqio/nsq
    command: /nsqd  --mem-queue-size=0 --data-path /usr/local/nsq/bin/data  --tcp-address 0.0.0.0:4152   --http-address 0.0.0.0:4153 --broadcast-address=nsqd2  --lookupd-tcp-address=nsqlookupd:4160
    depends_on:
      - nsqlookupd
    ports:
      - "4152:4152"
      - "4153:4153"
    volumes:
      - "/data/nsq2:/usr/local/nsq/bin/data"
  nsqlookupd22:
    hostname: nsqlookupd22
    image: nsqio/nsq
    command: /nsqlookupd --http-address 0.0.0.0:4261 --tcp-address 0.0.0.0:4260 --broadcast-address nsqlookupd22
    ports:
      - "4261:4261"
      - "4260:4260"
  nsqd21:
    hostname: nsqd21
    image: nsqio/nsq
    command: /nsqd  --tcp-address 0.0.0.0:4250 --http-address 0.0.0.0:4251  --broadcast-address=nsqd21 --lookupd-tcp-address=nsqlookupd22:4260
    depends_on:
      - nsqlookupd
    ports:
      - "4250:4250"
      - "4251:4251"
  nsqd22:
    hostname: nsqd22
    image: nsqio/nsq
    command: /nsqd   --tcp-address 0.0.0.0:4252  --http-address 0.0.0.0:4253 --broadcast-address=nsqd22  --lookupd-tcp-address=nsqlookupd22:4260
    depends_on:
      - nsqlookupd
    ports:
      - "4252:4252"
      - "4253:4253"
  nsqadmin:
    hostname: nsqadmin
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address nsqlookupd:4161  --lookupd-http-address nsqlookupd22:4261 
    depends_on:
      - nsqlookupd
    ports:
      - "4171:4171"
  nsq_to_file:
    hostname: nsq_to_file
    image: nsqio/nsq
    command: /nsq_to_file --output-dir /usr/local/nsq/bin/data --topic Topic_Demo --channel=archive  --lookupd-http-address nsqlookupd:4161 
    depends_on:
      - nsqlookupd
    ports:
      - "4181:4181"
      - "4182:4182"
    volumes:
      - "/data/nsq_to_file:/usr/local/nsq/bin/data"

启动集群

  • 启动

docker-compose up -d

wedAdmin管理界面: 127.0.0.1:4171

nsq go客户端

go get -u github.com/nsqio/go-nsq

Reids集群:
安装reids:redis.io/docs/getting-started/inst...

version: '3.1'
services:
  redis1:
    image: redis:7.0.5
    container_name: redis-1
    restart: always
    network_mode: "host"
    ports:
      - "6380:6380"
    environment:
      - REDISCLI_AUTH=Tm@587973
    volumes:
      - /data/redis-cluster/redis-1/data:/data
      - /data/redis-cluster/redis-1/redis.conf:/usr/local/etc/redis/redis.conf
    command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
  redis2:
    image: redis:7.0.5
    container_name: redis-2
    restart: always
    network_mode: "host"
    ports:
      - "6381:6381"
    environment:
      - REDISCLI_AUTH=Tm@587973
    volumes:
      - /data/redis-cluster/redis-2/data:/data
      - /data/redis-cluster/redis-2/redis.conf:/usr/local/etc/redis/redis.conf
    command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
  redis3:
    image: redis:7.0.5
    container_name: redis-3
    restart: always
    network_mode: "host"
    ports:
      - "6382:6382"
    environment:
      - REDISCLI_AUTH=Tm@587973
    volumes:
      - /data/redis-cluster/redis-3/data:/data
      - /data/redis-cluster/redis-3/redis.conf:/usr/local/etc/redis/redis.conf
    command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
  redis4:
    image: redis:7.0.5
    container_name: redis-4
    restart: always
    network_mode: "host"
    ports:
      - "6383:6383"
    environment:
      - REDISCLI_AUTH=Tm@587973
    volumes:
      - /data/redis-cluster/redis-4/data:/data
      - /data/redis-cluster/redis-4/redis.conf:/usr/local/etc/redis/redis.conf
    command: ["redis-server", "/usr/local/etc/redis/redis.conf"]
  redis5:
    image: redis:7.0.5
    container_name: redis-5
    restart: always
    network_mode: "host"
    ports:
      - "6384:6384"
    environment:
      - REDISCLI_AUTH=Tm@587973
    volumes:
      - /data/redis-cluster/redis-5/data:/data
      - /data/redis-cluster/redis-5/redis.conf:/usr/local/etc/redis/redis.conf
    command: ["redis-server", "/usr/local/etc/redis/redis.conf"]

Go代码

redis

golang.org/x/sync/singleflight
singleflight 可以防缓存击穿

type Cache struct {
    requestGroup singleflight.Group
}

func (c *Cache) GetCouponQuantity(CouponId int) (quantity int, err error) {
    CouponIdStr := strconv.Itoa(CouponId)
    quantityStr, err := rediser.ClusterClient.Get(context.Background(), "coupon:"+CouponIdStr).Result()
    if err != redis.Nil && err != nil {
        return 0, err
    }
    if err == nil {
        quantity, err = strconv.Atoi(quantityStr)
        return
    }
    return c.SetCouponQuantity(CouponIdStr)
}

// 缓存优惠券数量
func (c *Cache) SetCouponQuantity(CouponIdStr string) (quantity int, err error) {
    quantityAny, err, _ := c.requestGroup.Do(CouponIdStr, func() (interface{}, error) {
        rediser.ClusterClient.Del(context.TODO(), "coupon:"+CouponIdStr)
        var coupon models.Coupon
        err := core.New().Db.Table("coupon").Where("coupon_id = ?", CouponIdStr).Scan(&coupon).Error
        if err != nil {
            return 0, err
        }
        cmd := rediser.ClusterClient.SetNX(context.Background(), "coupon:"+CouponIdStr, coupon.Quantity, time.Hour*1)
        if cmd.Err() != nil {
            return 0, cmd.Err()
        }
        return coupon.Quantity, nil
    })
    quantity = quantityAny.(int)
    return
}
// 优惠券原子减一
func (c *Cache) CouponQuantityDecr(couponId int) error {
    CouponIdStr := strconv.Itoa(couponId)
    q, err := rediser.ClusterClient.Decr(context.Background(), "coupon:"+CouponIdStr).Result()
    if err != nil {
        return err
    }
    if q < 0 {
        rediser.ClusterClient.Incr(context.Background(), "coupon:"+CouponIdStr)
        return errors.New("数量不足")
    }
    return nil
}

NSQ

生产者


var  Producer *nsq.Producer

type  NewNsqProducer  struct {

Nsqd_host string

Nsqd_port int

}

func (NP *NewNsqProducer) InitProducer() (*nsq.Producer, error) {

 config := nsq.NewConfig()

 nsqAddr := fmt.Sprintf("%s:%d", NP.Nsqd_host, NP.Nsqd_port)

 var  err  error

 Producer, err = nsq.NewProducer(nsqAddr, config)

 if err != nil {

        fmt.Printf("create producer failed,err:%v  \n", err)

 return  nil, err

    }

 return Producer, nil

}

消费者

type NsqConsumer struct {
    Title string
}

func (NC *NsqConsumer) InitConsumer(topic string, channel string, address string) (consumer *nsq.Consumer, err error) {

    // 1.NewConfig返回一个新的默认nsq配置
    config := nsq.NewConfig()

    // 2.查找轮询间隔此处设置15s
    config.LookupdPollInterval = 20 * time.Second

    // 3.NewConsumer为指定的主题/频道创建新的Consumer实例
    consumer, err = nsq.NewConsumer(topic, channel, config)
    if err != nil {
        fmt.Printf("create consumer failed, err:%v\n", err)
        return
    }
    consumer.AddHandler(&coupon.NsqConsumer{Title: NC.Title})
    // 5.两种方式连接到NSQD
    // if err := c.ConnectToNSQD(address); err != nil {   // 直接连NSQD,tcp长连接
    if err := consumer.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询连NSQD,更易于分布式容错和高可用
        return consumer, err
    }

    return consumer, nil
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!