章节索引 :

Zookeeper Watch

1. 前言

在我们使用 Zookeeper 来实现服务注册与发现、配置中心、分布式通知等功能时,需要使用 Zookeeper 的核心功能 Watch,来对节点进行监听。那么 Zookeeper 的 Watch 是如何实现对节点的监听,并响应事件到客户端的呢?我们就带着这个问题开始本节的内容。

2. Watch 的实现

我们在 Zookeeper 的数据模型这一小节中学习过 Znode 节点的类型和 Znode 节点的特点,这是 Zookeeper 核心特性之一。在大多数情况下,我们都会把 Znode 与 Watch 捆绑使用,接下来我们就使用 Zookeeper 的 Java 客户端 Curator 来实现 Watch 对 Znode 节点的监听。
我们可以继续使用我们在 Zookeeper Curator 这一节中创建的 Spring Boot 测试项目,在测试方法中对 Watch 进行实现。

2.1 CuratorWatcher

在我们使用 Curator 的 Fluent 风格进行链式调用时,我们可以使用 usingWatcher 来注册 CuratorWatcher 来对我们的节点变化事件进行监听:

@Test
void contextLoads() throws Exception {
    // 获取客户端
    CuratorFramework curatorClient = curatorService.getCuratorClient();
    // 开启会话
    curatorClient.start();
    // CuratorWatcher 为接口,我们需要实现 process 方法
    CuratorWatcher watcher = new CuratorWatcher(){
        @Override
        // 监听事件处理
        public void process(WatchedEvent watchedEvent) {
            // 输出 监听事件
            System.out.println(watchedEvent.toString());
        }
    };
    // 在命名空间下创建持久节点 mooc,内容为 Wiki
    curatorClient.create().forPath("/mooc","Wiki".getBytes());
    // 获取 mooc 节点的 data 数据,并对 mooc 节点开启监听
    byte[] bytes = curatorClient.getData().usingWatcher(watcher).forPath("/mooc");
    // 输出 data
    System.out.println(new String(bytes));
    // 第一次更新
    curatorClient.setData().forPath("/mooc", "Wiki001".getBytes());
    // 第二次更新
    curatorClient.setData().forPath("/mooc","Wiki002".getBytes());
}

控制台输出:

Wiki
WatchedEvent state:SyncConnected type:NodeDataChanged path:/mooc

控制台输出的第一行 Wiki 为 mooc 节点的 data 数据。第二行输出的 WatchedEvent 为监听到的事件,state 表示监听状态;type 表示监听到的事件类型,我们可以判断事件的类型来做相应的处理;path 表示监听的节点。
介绍完 WatchedEvent,我们发现控制台只输出了一次 WatchedEvent,也就是说 CuratorWatcher 只进行了一次监听。如果想要重复使用我们需要重新使用 usingWatcher 进行注册。那么有没有不需要重复注册的监听呢?接下来我们就来介绍 Curator 一种功能强大的监听 CuratorCacheListener。

2.2 CuratorCacheListener

CuratorCacheListener 是基于 CuratorCache 缓存实现的监听器,CuratorCache 对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。我们使用 CuratorCacheListener 时,需要使用构建器 CuratorCacheListenerBuilder 来对具体的事件监听进行构建,并且把 CuratorCacheListener 注册到 CuratorCache 缓存中。
首先我们需要构建 CuratorCache 缓存实例,在 CuratorCache 接口中,build 为静态方法,我们可以直接调用:

// 构建 CuratorCache 缓存实例
static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
    return builder(client, path).withOptions(options).build();
}

我们来说明以下入参:CuratorFramework client 是 Curator 客户端;String path 是需要被监听的节点的路径;CuratorCache.Options... options 是对缓存设置的参数,我们可以设置以下 3 种:

public static enum Options {
    // 单节点缓存
    SINGLE_NODE_CACHE,
    // 对数据进行压缩
    COMPRESSED_DATA,
    // CuratorCache 关闭后不清除缓存
    DO_NOT_CLEAR_ON_CLOSE;
}

构建完缓存实例,我们再来构建 CuratorCacheListener ,在 CuratorCacheListener 接口中的构建方法 builder 为静态方法,我们可以直接调用:

// builder 方法,返回 CuratorCacheListenerBuilder 构建器,我们就可以使用具体的监听方法了
static CuratorCacheListenerBuilder builder() {
    return new CuratorCacheListenerBuilderImpl();
}

最后我们需要把 CuratorCacheListener 注册到 CuratorCache 中,并开启缓存:

// 注册 CuratorCacheListener 
cache.listenable().addListener(listener);
// 开启缓存
cache.start();

我们来看一个完整的例子:

@Test
void contextLoads() throws Exception {
    // 获取客户端
    CuratorFramework client = curatorService.getCuratorClient();
    // 开启会话
    client.start();
    // 构建 CuratorCache 实例
    CuratorCache cache = CuratorCache.build(client, "/mooc");
	// 使用 Fluent 风格和 lambda 表达式来构建 CuratorCacheListener 的事件监听
    CuratorCacheListener listener = CuratorCacheListener.builder()
        // 开启对所有事件的监听
        // type 事件类型:NODE_CREATED, NODE_CHANGED, NODE_DELETED;
        // oldNode 原节点:ChildData 类,包括节点路径,节点状态 Stat,节点 data
        // newNode 新节点:同上
       .forAll((type, oldNode, newNode) -> {
           System.out.println("forAll 事件类型:" + type);
           System.out.println("forAll 原节点:" + oldNode);
           System.out.println("forAll 新节点:" + newNode);
       })
        // 开启对节点创建事件的监听
        .forCreates(childData -> {
            System.out.println("forCreates 新节点:" + childData);
        })
        // 开启对节点更新事件的监听
        .forChanges((oldNode, newNode) -> {
            System.out.println("forChanges 原节点:" + oldNode);
            System.out.println("forChanges 新节点:" + newNode);
        })
        // 开启对节点删除事件的监听
        .forDeletes(oldNode -> {
            System.out.println("forDeletes 原节点:" + oldNode);
        })
        // 初始化
        .forInitialized(() -> {
            System.out.println("forInitialized 初始化");
        })
        // 构建
        .build();

    // 注册 CuratorCacheListener 到 CuratorCache
    cache.listenable().addListener(listener);
    // CuratorCache 开启缓存
    cache.start();
    // mooc 节点创建
    client.create().forPath("/mooc");
    // mooc 节点更新
    client.setData().forPath("/mooc","Wiki".getBytes());
    // mooc 节点删除
    client.delete().forPath("/mooc");
}

我们来查看 CuratorCacheListenerBuilder 接口中具体的事件监听,我们需要监听哪种事件就使用哪种方法:

public interface CuratorCacheListenerBuilder {
    // 全部事件
    CuratorCacheListenerBuilder forAll(CuratorCacheListener var1);
	// 创建事件
    CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1);
	// 更新事件
    CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1);
	// 创建和更新事件
    CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1);
	// 删除事件
    CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1);
	// 初始化后开启线程异步执行
    CuratorCacheListenerBuilder forInitialized(Runnable var1);
	// 子节点的事件
    CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3);
	// 节点本身的事件和子节点的事件
    CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2);
	// 节点本身的事件
    CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1);
 	// 初始化后开启监听
    CuratorCacheListenerBuilder afterInitialized();
	// 构建方法
    CuratorCacheListener build();
	/*
	* 更新事件时被调用
	*/
    @FunctionalInterface
    public interface ChangeListener {
        void event(ChildData var1, ChildData var2);
    }
}

接下来我们执行测试方法,查看控制台输出:

forInitialized 初始化
forAll 事件类型:NODE_CREATED
forAll 原节点:null
forAll 新节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forCreates 新节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 事件类型:NODE_CHANGED
forAll 原节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 新节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forChanges 原节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forChanges 新节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 事件类型:NODE_DELETED
forAll 原节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 新节点:null
forDeletes 原节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}

我们发现,我们设置的 create,setData,delete 这 3 种事件都被监听到了,而且 forAll 每一种事件都监听到了,所以我们在使用的时候,选择我们需要的事件监听即可。
介绍完 CuratorCacheListener 监听器,并完成了事件监听的测试,那么 Zookeeper 的 Watch 是如何运行的呢?接下来我们就来介绍 Watch 的运行原理。

3. Watch 的原理

在介绍 Watch 的原理之前,我们先熟悉一个概念:Zookeeper 客户端对 Znode 的写操作,也就是新增节点、更新节点、删除节点这些操作,默认会开启监听;Zookeeper 客户端对 Znode 的读操作,也就是查询节点数据、查询节点是否存在、查询子节点等操作,需要手动设置开启监听。这也是为什么在 GetDataRequest 请求体中会有 watch 这个属性的原因。
Watch 的运行过程分为 4 部分,分别是:客户端注册 Watch 、服务端注册 Watch、服务端触发 Watch、客户端处理回调。
Watch 的运行过程

  • 客户端注册 Watch
    当我们使用 Zookeeper 客户端向 Zookeeper 服务端发送带有事件监听的请求时,Zookeeper 客户端会把该请求标记成带有 Watch 的请求,然后把 Watch 监听器注册到 ListenerManager 中。

  • 服务端注册 Watch
    Zookeeper 服务端接收到 Zookeeper 客户端发送过来的请求,解析请求体,判断该请求是否带有 Watch 事件,如果有 Watch 事件,就会把 Watch 事件注册到 WatchManager 中。

  • 服务端触发 Watch
    Zookeeper 服务端注册完 Watch 事件后,会调用 WatchManager 的 triggerWatch 方法来触发 Watch 事件,Watch 事件完成后,向客户端发送响应。

  • 客户端处理回调
    Zookeeper 客户端接收到 Zookeeper 服务端的响应后,解析响应体,根据响应体的类型去 ListenerManager 中查找相对应的 Watch 监听器,然后触发监听器的回调函数。

4. 总结

在本节中,我们学习了使用 Curator 的两种方式开启对事件的监听,也了解了 Watch 运行过程的 4 个部分。以下是本节内容的总结:

  1. Zookeeper Watch 的实现。
  2. Zookeeper Watch 的原理。