Zookeeper 客户端 API 操作

一、环境搭建

前提:保证三个节点服务器上 Zookeeper 集群服务端启动。

1.1、创建工程:zookeeper

1.2、引入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.5.7</version>
  </dependency>

  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.13.2</version>
    <scope>compile</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.logging.log4j</groupId>
    <artifactId>log4j-core</artifactId>
    <version>2.19.0</version>
  </dependency>
</dependencies>

1.3、创建com.hudu.ZkClient

二、Zookeeper API 操作

2.1、创建 Zookeeper 客户端

public class ZkClient {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";

    private static final int sessionTimeout = 2000;

    private ZooKeeper zkClient = null;

    @Before
    public void init() throws IOException {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 收到事件通知后的回调函数
                logger.info("{} -- {}", event.getType(), event.getPath());
                // 再次启动监听
                try {
                    List<String> children = zkClient.getChildren("/", true);
                    for (String child : children) {
                        logger.info(child);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

2.2、创建子节点

@Test
public void create() throws Exception {
  String nodeCreated = zkClient.create("/test", "test".getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

回调函数输出为

None -- null
zookeeper
test

通过 zk 客户端查看创建的节点的情况

[zk: 127.0.0.1:2181(CONNECTED) 0] get /test
test

2.3、获取子节点并监听节点变化

@Test
public void getChildren() throws Exception {
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
    logger.info(child);
    }
    // 延时阻塞
    TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);
}

此时通过 zk 客户端对 zookeeper 进行操作

[zk: 127.0.0.1:2182(CONNECTED) 4] create /test1 "test1"
Created /test1
[zk: 127.0.0.1:2182(CONNECTED) 5] delete /test1

分别输出

NodeChildrenChanged -- /
zookeeper
test
test1
NodeChildrenChanged -- /
zookeeper
test

2.4、判断 Znode 是否存在

@Test
public void exist() throws Exception {
    Stat stat = zkClient.exists("/test", false);

    logger.info(stat == null ? "not exist" : "exist");
}

三、客户端向服务端写数据流程

3.1、Zookeeper 写入请求直接发送给 Leader

Zookeeper 写入请求直接发送给 Leader

当请求直接发送给 Leader 后,Leader 先进行 Write 操作,然后通知 Follower 进行写操作,当有超过半数的 Follower 完成 Write操作之后,通知 Client 写入操作完成

3.2、Zookeeper 写入请求发送给 follower

Zookeeper 写入请求直接发送给 Leader

当请求发送给 Follower 之后,Follower 通知 Leader,Leader进行写操作,然后 Leader 通知 Follower 进行 Write,当有半数节点 ack,即超过半数节点写入完成,Leader 通知 Follower 写入完成,然后 Follower 通过 Client 写入完成。

四、服务器动态上下线监听案例

4.1、架构图如下

Zookeeper 实现服务器动态上下线监听

4.2、具体实现

1、在集群上创建 /server 节点

[zk: 127.0.0.1:2182(CONNECTED) 0] create /servers "servers"
Created /servers

工具类代码如下

public class ZKTools {

    private final Logger logger = LoggerFactory.getLogger(ZKTools.class);

    private ZooKeeper zk;

    private static final String SERVER_PATH_PREFIX = "/servers";

    /**
     * 服务端连接
     */
    public void getServerConnect() throws IOException {
        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        int sessionTimeout = 2000;
        zk = new ZooKeeper(connectString, sessionTimeout, null);
    }

    /**
     * 客户端连接,注册 server 节点监听
     */
    public void getClientConnect() throws IOException {
        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        int sessionTimeout = 2000;
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                try {
                    getServerList();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }

    /**
     * 注册服务
     */
    public void registerServer(String hostname) throws KeeperException, InterruptedException {
        String created = zk.create(SERVER_PATH_PREFIX + "/" + hostname, hostname.getBytes(StandardCharsets.UTF_8), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    /**
     * 服务端业务
     */
    public void serverBusiness(String hostname) throws InterruptedException {
        logger.info("{} is online", hostname);
        TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);
    }

    /**
     * 获取子节点信息,并监听父节点
     */
    public void getServerList() throws KeeperException, InterruptedException {
        // 获取子节点,并对父节点进行监听
        List<String> children = zk.getChildren(SERVER_PATH_PREFIX, true);
        List<String> servers = new ArrayList<>();
        for (String child : children) {
            byte[] data = zk.getData(SERVER_PATH_PREFIX + "/" + child, false, null);
            servers.add(new String(data));
        }
        logger.info(servers.toString());
    }

    /**
     * 客户端业务
     */
    public void clientBusiness() throws InterruptedException {
        logger.info("client is working ... ");
        TimeUnit.MILLISECONDS.sleep(Long.MAX_VALUE);
    }
}

服务端代码

public class DistributeServer {
    public static void main(String[] args) throws Exception {
        ZKTools zkTools = new ZKTools();
        zkTools.getServerConnect();
        zkTools.registerServer(args[0]);
        zkTools.serverBusiness(args[0]);
    }
}

客户端代码

public class DistributeClient {
    public static void main(String[] args) throws Exception {
        ZKTools zkTools = new ZKTools();
        zkTools.getClientConnect();
        zkTools.clientBusiness();
    }
}

修改 DistributeServer,传入 server1 参数

Zookeeper 客户端 API 操作

DistributeClient 输出如下

[server1]

五、Zookeeper 分布式锁案例

5.1、原生 Zookeeper 分布式锁实现

public class DistributedLock {

    Logger logger = LoggerFactory.getLogger(DistributedLock.class);

    private ZooKeeper zk;

    // Zookeeper 连接
    private CountDownLatch connectLatch = new CountDownLatch(1);

    // Zookeeper 节点等待
    private CountDownLatch waitLatch = new CountDownLatch(1);

    private String rootNode = "locks";
    private String subNode = "seq-";
    // 当前 client 创建的子节点
    private String currentNode;

    // 当前 client 等待的子节点
    private String waitPath;

    /**
     * 与 zk 服务创建连接
     */
    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        int sessionTimeout = 2000;
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 连接建立时,打开 latch,唤醒 wait 在该 latch 上的线程
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectLatch.countDown();
                }

                // 发生了 waitPath 删除事件
                if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitLatch.countDown();
                }
            }
        });

        // 等待连接建立
        connectLatch.await();

        // 获取根节点状态
        Stat stat = zk.exists("/" + rootNode, false);

        // 如果节点不存在,则创建节点,根节点类型为永久节点
        if (stat == null) {
            logger.info("根节点不存在");

            zk.create("/" + rootNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    /**
     * 加锁方法
     */
    public void zkLock() {
        try {
            // 在根节点下创建临时顺序节点,返回值为创建的节点路径
            currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 注意,没必要监听 "/locks" 的子节点的变化情况
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);

            // 列表中只有一个子节点,那肯定就是 currentNode,说明 client 获得锁
            if (childrenNodes.size() == 1) {
                return;
            } else {
                // 对根节点下的所有临时顺序节点进行从小到大排序
                Collections.sort(childrenNodes);

                // 当前节点的名称
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());

                // 获取当前节点的位置
                int index = childrenNodes.indexOf(thisNode);

                if (index == -1) {
                    logger.error("数据异常");
                } else if (index == 0) {
                    // 说明 thisNode 在列表中最小,当前 client 获得锁
                    return;
                } else {
                    // 获得排名比 currentNode 前一位的节点
                    waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);

                    // 在 waitPath 上注册监听器,当 waitPath 被删除时,zookeeper 会回调监听器的 process 方法
                    zk.getData(waitPath, true, new Stat());

                    // 进入等待锁状态
                    waitLatch.await();

                    return;
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 解锁方法
     */
    public void zkUnlock() {
        try {
            zk.delete(currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

}

测试类

public class DistributedLockTest {

    public static void main(String[] args) throws InterruptedException, IOException, KeeperException {

        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                lock1.zkLock();
                System.out.println("线程1 获取锁");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock1.zkUnlock();
                System.out.println("线程1 释放锁");
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 获取锁对象
                lock2.zkLock();
                System.out.println("线程2 获取锁");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                lock2.zkUnlock();
                System.out.println("线程2 释放锁");
            }
        }).start();
    }
}

运行结果如下

线程1 获取锁
线程1 释放锁
线程2 获取锁
线程2 释放锁

5.2、Curator 框架实现分布式锁案例

5.2.1、原生 JAVA API 开发存在的问题

  1. 会话是异步的,需要自己去处理,比如使用 CountDownLatch
  2. Watch 需要重复注册,不然就不能生效
  3. 开发的复杂性还是比较高
  4. 不支持多节点删除和创建。需要自己递归

5.2.2、Curator 优势

Curator 是一个专门解决分布式锁的框架,解决了原生 Java API 开发分布式锁遇到的问题,详情查看官方文档

Curator 官方文档地址

5.2.3、Curator 实际操作

1、引入依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.3.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.3.0</version>
</dependency>

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>5.3.0</version>
</dependency>

2、代码实现

public class CuratorLockTest {

    private String rootNode;
    private String connectString;

    public static void main(String[] args) {
        new CuratorLockTest().test();
    }

    private void test() {
        // 创建分布式锁
        rootNode = "/locks";
        InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
        InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);

        new Thread(()->{
            try {
                lock1.acquire();
                System.out.println("线程1 获取锁");
                // 测试重入锁
                lock1.acquire();
                System.out.println("线程1 再次获取锁");
                Thread.sleep(3000);
                lock1.release();
                System.out.println("线程1 释放锁");
                lock1.release();
                System.out.println("线程1 再次释放锁");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(()->{
            try {
                lock2.acquire();
                System.out.println("线程2 获取锁");
                // 测试重入锁
                lock2.acquire();
                System.out.println("线程2 再次获取锁");
                Thread.sleep(3000);
                lock2.release();
                System.out.println("线程2 释放锁");
                lock2.release();
                System.out.println("线程2 再次释放锁");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    /**
     * 分布式锁初始化
     */
    private CuratorFramework getCuratorFramework() {
        // 重试策略,重试间隔三秒,重试三次
        ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
        connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
        int connectionTimeoutMs = 2000;
        int sessionTimeoutMs = 2000;
        // 通过工厂创建 Curator
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(connectionTimeoutMs)
                .sessionTimeoutMs(sessionTimeoutMs)
                .retryPolicy(policy).build();

        // 开启连接
        client.start();
        System.out.println("zookeeper 初始化完成 。。。");
        return client;
    }
}

测试结果如下:

线程1 获取锁
线程1 再次获取锁
线程1 释放锁
线程1 再次释放锁
线程2 获取锁
线程2 再次获取锁
线程2 释放锁
线程2 再次释放锁
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商