分享
GO ZK WATCH监听
mingzhehaolove · · 5169 次点击 · · 开始浏览这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。
package main
import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"strings"
"time"
)
func must(err error) {
if err != nil {
panic(err)
}
}
func connect() *zk.Conn {
servers := strings.Split("127.0.0.1:2181", ",")
conn, _, err := zk.Connect(servers, time.Second)
must(err)
return conn
}
func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
snapshots := make(chan []string)
errors := make(chan error)
go func() {
for {
snapshot, _, events, err := conn.ChildrenW(path)
if err != nil {
errors <- err
return
}
snapshots <- snapshot
evt := <-events
if evt.Err != nil {
errors <- evt.Err
return
}
}
}()
return snapshots, errors
}
func main() {
conn1 := connect()
defer conn1.Close()
flags := int32(zk.FlagEphemeral)
acl := zk.WorldACL(zk.PermAll)
_, err := conn1.Create("/mirror", nil, int32(0), acl)
if err != nil {
fmt.Printf("create: %+v\n", err)
}
snapshots, errors := mirror(conn1, "/mirror")
go func() {
for {
select {
case snapshot := <-snapshots:
fmt.Printf("%+v\n", snapshot)
case err := <-errors:
fmt.Printf("%+v\n", err)
}
}
}()
conn2 := connect()
time.Sleep(time.Second)
_, err = conn2.Create("/mirror/one", []byte("one"), flags, acl)
must(err)
time.Sleep(time.Second)
_, err = conn2.Create("/mirror/two", []byte("two"), flags, acl)
must(err)
time.Sleep(time.Second)
err = conn1.Delete("/mirror/two", 0)
must(err)
time.Sleep(time.Second)
_, err = conn2.Create("/mirror/three", []byte("three"), flags, acl)
must(err)
time.Sleep(time.Second)
conn2.Close()
time.Sleep(time.Second)
}
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889
关注微信5169 次点击
下一篇:Pockmon go安装遇到的坑
添加一条新回复
(您需要 后才能回复 没有账号 ?)
- 请尽量让自己的回复能够对别人有帮助
- 支持 Markdown 格式, **粗体**、~~删除线~~、
`单行代码` - 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
- 图片支持拖拽、截图粘贴等方式上传
收入到我管理的专栏 新建专栏
package main
import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"strings"
"time"
)
func must(err error) {
if err != nil {
panic(err)
}
}
func connect() *zk.Conn {
servers := strings.Split("127.0.0.1:2181", ",")
conn, _, err := zk.Connect(servers, time.Second)
must(err)
return conn
}
func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
snapshots := make(chan []string)
errors := make(chan error)
go func() {
for {
snapshot, _, events, err := conn.ChildrenW(path)
if err != nil {
errors <- err
return
}
snapshots <- snapshot
evt := <-events
if evt.Err != nil {
errors <- evt.Err
return
}
}
}()
return snapshots, errors
}
func main() {
conn1 := connect()
defer conn1.Close()
flags := int32(zk.FlagEphemeral)
acl := zk.WorldACL(zk.PermAll)
_, err := conn1.Create("/mirror", nil, int32(0), acl)
if err != nil {
fmt.Printf("create: %+v\n", err)
}
snapshots, errors := mirror(conn1, "/mirror")
go func() {
for {
select {
case snapshot := <-snapshots:
fmt.Printf("%+v\n", snapshot)
case err := <-errors:
fmt.Printf("%+v\n", err)
}
}
}()
conn2 := connect()
time.Sleep(time.Second)
_, err = conn2.Create("/mirror/one", []byte("one"), flags, acl)
must(err)
time.Sleep(time.Second)
_, err = conn2.Create("/mirror/two", []byte("two"), flags, acl)
must(err)
time.Sleep(time.Second)
err = conn1.Delete("/mirror/two", 0)
must(err)
time.Sleep(time.Second)
_, err = conn2.Create("/mirror/three", []byte("three"), flags, acl)
must(err)
time.Sleep(time.Second)
conn2.Close()
time.Sleep(time.Second)
}