Zookeeper 客户端 API

引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

创建连接

接口说明

ZooKeeper构造函数说明

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);

public ZooKeeper(String connectString, 
                 int sessionTimeout,
                 Watcher watcher,
                boolean canBeReadOnly);

public ZooKeeper(String connectString, 
                 int sessionTimeout, 
                 Watcher watcher,
                 long sessionId, 
                 byte[] sessionPasswd);


ZooKeeper(String connectString, 
          int sessionTimeout, 
          Watcher watcher,
          long sessionId, 
          byte[] sessionPasswd, 
          boolean canBeReadOnly);

参数解释

connectString :zk服务器列表,由英文逗号分开的字符串,

例如:127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183;

sessionTimeout :会话超时时间,以毫秒为单位。在一个会话周期内,zk客户端和服务端通过心跳来检查连接的有效性,一旦在sessionTimeout时间内没有进行心跳检测,则会话失效。

watcher: zk允许客户端在构造方法中传入一个Watcher接口实现类作为事件通知处理器。

canBeReadOnly :在zk集群模式中,如果一台集群和集群中过半以上的机器都都失去了网络连接,那么这个机器将不再处理客户端请求,包括读写请求。但在某些情况下出现类似问题,我们希望该台机器能够处理读请求,此时为 read-only 模式。

sessionId、sessionPasswd :利用sessionId 和 sessionPasswd 确保复用会话连接。

代码

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。       
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();
        // 关闭 zk
        zk.close();
    }
}

创建节点

接口说明

// 同步创建节点
public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode);

// 异步创建节点
public void create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode,  StringCallback cb, Object ctx);

参数解释

String path:node的路径

byte data[]:初始化node数据

List acl:node的acl权限

CreateMode createMode:指定node类型,共有四种类型,如下所示

  1. CreateMode.PERSISTENT:持久化节点,客户端连接断开后不会被自动删除

  2. CreateMode.PERSISTENT_SEQUENTIAL:持久化顺序节点

  3. CreateMode.EPHEMERAL:临时节点,客户端连接断开后会被自动删除

  4. CreateMode.EPHEMERAL_SEQUENTIAL:临时顺序节点

StringCallback cb:异步回调函数

Object ctx:用于传递上下文信息

同步创建节点

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();

        // 临时节点:创建接口返回该节点路径L,例如返回值 /zk-test
        String path1 = zk.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("created path success : " + path1);

        // 临时顺序节点:会自动的在节点路径后加一个数字,例如返回值:/ zk-test0000000001
        String path2 = zk.create("/zk-test", "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("created path success : " + path2);

        // 关闭 zk
        zk.close();
    }
}

异步创建节点

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();
        System.out.println(zk.getSessionId());
        System.out.println(zk.getSessionPasswd());
        /********************************************************************************************/

        // 持久化节点:创建接口返回该节点路径,无返回值;异步创建的节点:/zk-test
        zk.create("/zk-test",
                "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT,
                new CallBack(),
                "PERSISTENT");

        //持久化顺序节点:会自动的在节点路径后加一个数字,该方法无返回值,创建后节点:/zk-test-seq0000000002
        zk.create("/zk-test-seq",
                "".getBytes(),
                ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.PERSISTENT_SEQUENTIAL,
                new CallBack(),
                "PERSISTENT_SEQUENTIAL");

        Thread.sleep(1000);

        // 关闭 zk
        zk.close();
    }

    static class CallBack implements AsyncCallback.StringCallback {
        /**
         * 服务端回调方法
         *
         * @param code 服务端响应码:0 接口调用成功;-4 客户端和服务端连接断开;-110 节点已存在 ;-112 会话过期
         * @param path 创建节点传入的路径参数
         * @param ctx  异步创建api传入的ctx参数
         * @param name 服务端真正创建节点的名称,业务逻辑应该以该值为准
         */
        @Override
        public void processResult(int code, String path, Object ctx, String name) {
            System.out.println("created success : code = " + code + ",path = " + path + ",ctx = " + ctx + ",name = " + name);
        }
    }
}

获取节点数据

接口说明

// 同步获取
public byte[] getData(String path, boolean watch, Stat stat);
public byte[] getData(final String path, Watcher watcher, Stat stat);

// 异步获取
public void getData(String path, boolean watch, DataCallback cb, Object ctx);
public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx);

参数解释

String path:node的路径

boolean watch:如果 watch 为 true,该 znode 的状态变化会发送给构建 Zookeeper 指定的 watcher

Stat stat:节点的状态信息,有时候我们不仅需要最新的子节点列表,还要获取这个节点的最新状态信息,我们可以将一个旧的 stat 传入到api方法中,在方法执行过程中 stat 会被来自服务的新的 stat 替换掉。

Watcher watcher:watcher 用来监听该 znode 的状态变化

DataCallback cb:异步回调函数

Object ctx:用于传递上下文信息

同步获取数据

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();

        // 临时节点:创建接口返回该节点路径L,例如返回值 /zk-test
        String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("created path success : " + path1);


//        byte[] data = zk.getData("/zk-data", true, null);
//        System.out.println("data = " + new String(data));

        /**
         * 也可以通过如下方式进行监听
         */
        byte[] data = zk.getData("/zk-data", new Watcher() {
            // 当 /zk-data 节点信息数据改变时将会触发这里
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
            }
        }, null);
        System.out.println("data = " + new String(data));

        // 关闭 zk
        zk.close();
    }
}

修改节点数据

接口说明

// 同步设置
// 如果 version 为 -1,则表示无无条件更新。
// 
Stat setData(final String path, byte data[], int version);
// 异步设置
public void setData(final String path, byte data[], int version,
            StatCallback cb, Object ctx);

参数解释

String path:node的路径

byte data[]:更新数据

int version:如果 version 为 -1,则表示无无条件更新;否则只有给定的 version 和 znode 当前的 version 一样才会进行更新

StatCallback cb:异步回调函数

Object ctx:于传递上下文信息

同步修改数据

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();

        // 临时节点:创建接口返回该节点路径L,例如返回值 /zk-test
        String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        System.out.println("created path success : " + path1);

        // 获取数据
        byte[] data = zk.getData("/zk-data", new Watcher() {
            // 当 /zk-data 节点信息数据改变时将会触发这里
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
            }
        }, null);
        System.out.println("data = " + new String(data));

        // 修改数据
        zk.setData("/zk-data","test-data-update".getBytes(), -1);
        // 获取数据
        byte[] updateData = zk.getData("/zk-data", true, null);
        System.out.println("updateData = " + new String(updateData));

        // 关闭 zk
        zk.close();
    }
}

删除节点

接口说明

// 同步删除
public void delete(final String path, int version);
// 异步删除
public void delete(final String path, int version, VoidCallback cb,
            Object ctx)

参数解释

String path:node的路径

int version:如果 version 为 -1,则表示无无条件删除;否则只有给定的 version 和 znode 当前的 version 一样才会进行删除

VoidCallbackcb:异步回调函数

Object ctx:于传递上下文信息

同步删除节点

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();

        // 创建持久化节点
        String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        System.out.println("created path success : " + path1);

        zk.delete("/zk-data",-1);
        // 关闭 zk
        zk.close();
    }
}

判断节点是否存在

接口说明

public Stat exists(String path, boolean watch);
public Stat exists(final String path, Watcher watcher);

public void exists(String path, boolean watch, StatCallback cb, Object ctx);
public void exists(final String path, Watcher watcher,
            StatCallback cb, Object ctx);

参数解释

这列的参数跟前面的分析是类似的

代码实践

public class Test {
    // 客户端 和 服务端 创建会话的过程是异步的。也就是客户度通过构造方法创建会话后立即返回,此时的连接并没有完全建立
    // 真正的会话建立完成后,zk服务端会给客户端通知一个事件,客户端获取通知之后在表明连接正在建立。
    // 用于等待zk服务端通知
    private static CountDownLatch latch = new CountDownLatch(1);

    private static ZooKeeper zk = null;
    // 如果有多个地址,则使用逗号隔开
    private static final String connString="127.0.0.1:2181";
    // 超时时间
    private static final int sessionTimeout = 2000;

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("path = " + event.getPath() + ", eventType = " + event.getType() + ", keeperState = " + event.getState());
                if (Event.KeeperState.SyncConnected == event.getState()) {
                    latch.countDown();
                }
            }
        });
        latch.await();
        // 创建持久化节点
        String path1 = zk.create("/zk-data", "test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Stat stat1 = zk.exists("/zk-data", true);
        System.out.println(stat1);
        zk.delete("/zk-data",-1);
        Stat stat2 = zk.exists("/zk-data", false);
        System.out.println(stat2);
        // 关闭 zk
        zk.close();
    }
}

设置监听事件API

public byte[] getData(String path, boolean watch, Stat stat);
public byte[] getData(final String path, Watcher watcher, Stat stat);

public Stat exists(String path, boolean watch);
public Stat exists(final String path, Watcher watcher);

boolean watch:如果 watch 为 true,该 znode 的状态变化会发送给构建 Zookeeper 指定的 watcher

Watcher watcher:watcher 用来监听该 znode 的状态变化

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

设置完acl权限后 如何使用kafka进行连接呢

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!