Zookeeper 客户端 API
引入依赖#
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
创建连接#
接口说明#
ZooKeeper
构造函数说明
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
public ZooKeeper(String connectString,
int sessionTimeout,
Watcher watcher,
boolean canBeReadOnly);
public ZooKeeper(String connectString,
int sessionTimeout,
Watcher watcher,
long sessionId,
byte[] sessionPasswd);
ZooKeeper(String connectString,
int sessionTimeout,
Watcher watcher,
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly);
参数解释
connectString :zk 服务器列表,由英文逗号分开的字符串,
例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183;
sessionTimeout :会话超时时间,以毫秒为单位。在一个会话周期内,zk 客户端和服务端通过心跳来检查连接的有效性,一旦在 sessionTimeout 时间内没有进行心跳检测,则会话失效。
watcher: zk 允许客户端在构造方法中传入一个 Watcher 接口实现类作为事件通知处理器。
canBeReadOnly :在 zk 集群模式中,如果一台集群和集群中过半以上的机器都都失去了网络连接,那么这个机器将不再处理客户端请求,包括读写请求。但在某些情况下出现类似问题,我们希望该台机器能够处理读请求,此时为 read-only 模式。
sessionId、sessionPasswd :利用 sessionId 和 sessionPasswd 确保复用会话连接。
代码#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
// 关闭 zk
zk.close();
}
}
创建节点#
接口说明#
// 同步创建节点
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode);
// 异步创建节点
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx);
参数解释
String path:node 的路径
byte data[]:初始化 node 数据
List acl:node 的 acl 权限
CreateMode createMode:指定 node 类型,共有四种类型,如下所示
CreateMode.PERSISTENT:持久化节点,客户端连接断开后不会被自动删除
CreateMode.PERSISTENT_SEQUENTIAL:持久化顺序节点
CreateMode.EPHEMERAL:临时节点,客户端连接断开后会被自动删除
CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点
StringCallback cb:异步回调函数
Object ctx:用于传递上下文信息
同步创建节点#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
// 临时节点:创建接口返回该节点路径L,例如返回值 /zk-test
String path1 = zk.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("created path success : " + path1);
// 临时顺序节点:会自动的在节点路径后加一个数字,例如返回值:/ zk-test0000000001
String path2 = zk.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("created path success : " + path2);
// 关闭 zk
zk.close();
}
}
异步创建节点#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
System.out.println(zk.getSessionId());
System.out.println(zk.getSessionPasswd());
/********************************************************************************************/
// 持久化节点:创建接口返回该节点路径,无返回值;异步创建的节点:/zk-test
zk.create("/zk-test",
"".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
new CallBack(),
"PERSISTENT");
//持久化顺序节点:会自动的在节点路径后加一个数字,该方法无返回值,创建后节点:/zk-test-seq0000000002
zk.create("/zk-test-seq",
"".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL,
new CallBack(),
"PERSISTENT_SEQUENTIAL");
Thread.sleep(1000);
// 关闭 zk
zk.close();
}
static class CallBack implements AsyncCallback.StringCallback {
/**
* 服务端回调方法
*
* @param code 服务端响应码:0 接口调用成功;-4 客户端和服务端连接断开;-110 节点已存在 ;-112 会话过期
* @param path 创建节点传入的路径参数
* @param ctx 异步创建api传入的ctx参数
* @param name 服务端真正创建节点的名称,业务逻辑应该以该值为准
*/
@Override
public void processResult(int code, String path, Object ctx, String name) {
System.out.println("created success : code = " + code + ",path = " + path + ",ctx = " + ctx + ",name = " + name);
}
}
}
获取节点数据#
接口说明#
// 同步获取
public byte[] getData(String path, boolean watch, Stat stat);
public byte[] getData(final String path, Watcher watcher, Stat stat);
// 异步获取
public void getData(String path, boolean watch, DataCallback cb, Object ctx);
public void getData(final String path, Watcher watcher,
DataCallback cb, Object ctx);
参数解释
String path:node 的路径
boolean watch:如果 watch 为 true,该 znode 的状态变化会发送给构建 Zookeeper 指定的 watcher
Stat stat:节点的状态信息,有时候我们不仅需要最新的子节点列表,还要获取这个节点的最新状态信息,我们可以将一个旧的 stat 传入到 api 方法中,在方法执行过程中 stat 会被来自服务的新的 stat 替换掉。
Watcher watcher:watcher 用来监听该 znode 的状态变化
DataCallback cb:异步回调函数
Object ctx:用于传递上下文信息
同步获取数据#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
// 临时节点:创建接口返回该节点路径L,例如返回值 /zk-test
String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("created path success : " + path1);
// byte[] data = zk.getData("/zk-data", true, null);
// System.out.println("data = " + new String(data));
/**
* 也可以通过如下方式进行监听
*/
byte[] data = zk.getData("/zk-data", new Watcher() {
// 当 /zk-data 节点信息数据改变时将会触发这里
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
}
}, null);
System.out.println("data = " + new String(data));
// 关闭 zk
zk.close();
}
}
修改节点数据#
接口说明#
// 同步设置
// 如果 version 为 -1,则表示无无条件更新。
//
Stat setData(final String path, byte data[], int version);
// 异步设置
public void setData(final String path, byte data[], int version,
StatCallback cb, Object ctx);
参数解释
String path:node 的路径
byte data[]:更新数据
int version:如果 version 为 -1,则表示无无条件更新;否则只有给定的 version 和 znode 当前的 version 一样才会进行更新
StatCallback cb:异步回调函数
Object ctx:于传递上下文信息
同步修改数据#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
// 临时节点:创建接口返回该节点路径L,例如返回值 /zk-test
String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
System.out.println("created path success : " + path1);
// 获取数据
byte[] data = zk.getData("/zk-data", new Watcher() {
// 当 /zk-data 节点信息数据改变时将会触发这里
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
}
}, null);
System.out.println("data = " + new String(data));
// 修改数据
zk.setData("/zk-data","test-data-update".getBytes(), -1);
// 获取数据
byte[] updateData = zk.getData("/zk-data", true, null);
System.out.println("updateData = " + new String(updateData));
// 关闭 zk
zk.close();
}
}
删除节点#
接口说明#
// 同步删除
public void delete(final String path, int version);
// 异步删除
public void delete(final String path, int version, VoidCallback cb,
Object ctx)
参数解释
String path:node 的路径
int version:如果 version 为 -1,则表示无无条件删除;否则只有给定的 version 和 znode 当前的 version 一样才会进行删除
VoidCallbackcb:异步回调函数
Object ctx:于传递上下文信息
同步删除节点#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
// 创建持久化节点
String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("created path success : " + path1);
zk.delete("/zk-data",-1);
// 关闭 zk
zk.close();
}
}
判断节点是否存在#
接口说明#
public Stat exists(String path, boolean watch);
public Stat exists(final String path, Watcher watcher);
public void exists(String path, boolean watch, StatCallback cb, Object ctx);
public void exists(final String path, Watcher watcher,
StatCallback cb, Object ctx);
参数解释
这列的参数跟前面的分析是类似的
代码实践#
public class Test {
// 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
// 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
// 用于等待zk服务端通知
private static CountDownLatch latch = new CountDownLatch(1);
private static ZooKeeper zk = null;
// 如果有多个地址,则使用逗号隔开
private static final String connString="127.0.0.1:2181";
// 超时时间
private static final int sessionTimeout = 2000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
if (Event.KeeperState.SyncConnected == event.getState()) {
latch.countDown();
}
}
});
latch.await();
// 创建持久化节点
String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat1 = zk.exists("/zk-data", true);
System.out.println(stat1);
zk.delete("/zk-data",-1);
Stat stat2 = zk.exists("/zk-data", false);
System.out.println(stat2);
// 关闭 zk
zk.close();
}
}
设置监听事件 API#
public byte[] getData(String path, boolean watch, Stat stat);
public byte[] getData(final String path, Watcher watcher, Stat stat);
public Stat exists(String path, boolean watch);
public Stat exists(final String path, Watcher watcher);
boolean watch:如果 watch 为 true,该 znode 的状态变化会发送给构建 Zookeeper 指定的 watcher
Watcher watcher:watcher 用来监听该 znode 的状态变化
本作品采用《CC 协议》,转载必须注明作者和本文链接
设置完 acl 权限后 如何使用 kafka 进行连接呢