本书未发布
zookeeper - 发布/订阅
发布者/订阅者
type Publisher struct {
conn *zk.Conn
}
func NewPublisher(hosts []string) *Publisher {
conn, _, err := zk.Connect(hosts, time.Second) //*10)
if err != nil {
panic(err)
}
return &Publisher{conn}
}
func (p *Publisher) CreateTopic(path string) (ret string, err error) {
// flags有4种取值:
// 0:永久,除非手动删除
// zk.FlagEphemeral = 1:短暂,session断开则该节点也被删除
// zk.FlagSequence = 2:会自动在节点后面添加序号
// 3:Ephemeral和Sequence,即,短暂且自动添加序号
var flags int32 = 0
// 获取访问控制权限
acls := zk.WorldACL(zk.PermAll)
ret, err = p.conn.Create(path, []byte{}, flags, acls)
if err == zk.ErrNodeExists {
return path, nil
}
if err != nil {
return
}
return
}
func (p *Publisher) Publish(topic, data string) (err error) {
var payload = []byte(data)
_, stat, _ := p.conn.Get(topic)
_, err = p.conn.Set(topic, payload, stat.Version)
if err != nil {
fmt.Println("err", err)
}
return
}
func (p *Publisher) DeleteTopic(topic string) (err error) {
_, stat, _ := p.conn.Get(topic)
return p.conn.Delete(topic, stat.Version)
}
type Subscriber struct {
conn *zk.Conn
}
func NewSubscriber(hosts []string) *Subscriber {
conn, _, err := zk.Connect(hosts, time.Second)
if err != nil {
panic(err)
}
return &Subscriber{conn}
}
func (s *Subscriber) subscribe(topic string) error {
for {
isExist, stat, ch, err := s.conn.ExistsW(topic)
if err != nil {
return err
}
fmt.Println("isExist", isExist, stat)
event := <-ch
switch event.Type {
case zk.EventNodeDeleted:
fmt.Println(topic, "node delete")
return nil
case zk.EventNodeDataChanged:
data, _, err := s.conn.Get(topic)
if err != nil {
fmt.Println("err", err)
}
fmt.Println("data", data)
default:
fmt.Println("event type", event.Type)
}
}
}