本书未发布

zookeeper - 发布/订阅

未匹配的标注

发布者/订阅者

type Publisher struct {
    conn *zk.Conn
}

func NewPublisher(hosts []string) *Publisher {
  conn, _, err := zk.Connect(hosts, time.Second) //*10)
  if err != nil {
  panic(err)
  }
  return &Publisher{conn}
}

func (p *Publisher) CreateTopic(path string) (ret string, err error) {
  // flags有4种取值:
  // 0:永久,除非手动删除
  // zk.FlagEphemeral = 1:短暂,session断开则该节点也被删除
  // zk.FlagSequence  = 2:会自动在节点后面添加序号
  // 3:Ephemeral和Sequence,即,短暂且自动添加序号
  var flags int32 = 0
  // 获取访问控制权限
  acls := zk.WorldACL(zk.PermAll)
  ret, err = p.conn.Create(path, []byte{}, flags, acls)
  if err == zk.ErrNodeExists {
  return path, nil
  }
  if err != nil {
  return
  }
  return
}

func (p *Publisher) Publish(topic, data string) (err error) {
  var payload = []byte(data)
  _, stat, _ := p.conn.Get(topic)
  _, err = p.conn.Set(topic, payload, stat.Version)
  if err != nil {
  fmt.Println("err", err)
  }
  return
}

func (p *Publisher) DeleteTopic(topic string) (err error) {
  _, stat, _ := p.conn.Get(topic)
  return p.conn.Delete(topic, stat.Version)
}
type Subscriber struct {
    conn *zk.Conn
}

func NewSubscriber(hosts []string) *Subscriber {
  conn, _, err := zk.Connect(hosts, time.Second)
  if err != nil {
  panic(err)
  }
  return &Subscriber{conn}
}

func (s *Subscriber) subscribe(topic string) error {
  for {
  isExist, stat, ch, err := s.conn.ExistsW(topic)
  if err != nil {
  return err
      }
  fmt.Println("isExist", isExist, stat)
  event := <-ch
      switch event.Type {
  case zk.EventNodeDeleted:
         fmt.Println(topic, "node delete")
  return nil
 case zk.EventNodeDataChanged:
         data, _, err := s.conn.Get(topic)
  if err != nil {
  fmt.Println("err", err)
  }
  fmt.Println("data", data)
  default:
         fmt.Println("event type", event.Type)
  }
  }
}

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~