Zookeeper Watch
1. 前言
在我们使用 Zookeeper 来实现服务注册与发现、配置中心、分布式通知等功能时,需要使用 Zookeeper 的核心功能 Watch,来对节点进行监听。那么 Zookeeper 的 Watch 是如何实现对节点的监听,并响应事件到客户端的呢?我们就带着这个问题开始本节的内容。
2. Watch 的实现
我们在 Zookeeper 的数据模型这一小节中学习过 Znode 节点的类型和 Znode 节点的特点,这是 Zookeeper 核心特性之一。在大多数情况下,我们都会把 Znode 与 Watch 捆绑使用,接下来我们就使用 Zookeeper 的 Java 客户端 Curator 来实现 Watch 对 Znode 节点的监听。
我们可以继续使用我们在 Zookeeper Curator 这一节中创建的 Spring Boot 测试项目,在测试方法中对 Watch 进行实现。
2.1 CuratorWatcher
在我们使用 Curator 的 Fluent 风格进行链式调用时,我们可以使用 usingWatcher 来注册 CuratorWatcher 来对我们的节点变化事件进行监听:
@Test
void contextLoads() throws Exception {
// 获取客户端
CuratorFramework curatorClient = curatorService.getCuratorClient();
// 开启会话
curatorClient.start();
// CuratorWatcher 为接口,我们需要实现 process 方法
CuratorWatcher watcher = new CuratorWatcher(){
@Override
// 监听事件处理
public void process(WatchedEvent watchedEvent) {
// 输出 监听事件
System.out.println(watchedEvent.toString());
}
};
// 在命名空间下创建持久节点 mooc,内容为 Wiki
curatorClient.create().forPath("/mooc","Wiki".getBytes());
// 获取 mooc 节点的 data 数据,并对 mooc 节点开启监听
byte[] bytes = curatorClient.getData().usingWatcher(watcher).forPath("/mooc");
// 输出 data
System.out.println(new String(bytes));
// 第一次更新
curatorClient.setData().forPath("/mooc", "Wiki001".getBytes());
// 第二次更新
curatorClient.setData().forPath("/mooc","Wiki002".getBytes());
}
控制台输出:
Wiki
WatchedEvent state:SyncConnected type:NodeDataChanged path:/mooc
控制台输出的第一行 Wiki 为 mooc 节点的 data 数据。第二行输出的 WatchedEvent 为监听到的事件,state 表示监听状态;type 表示监听到的事件类型,我们可以判断事件的类型来做相应的处理;path 表示监听的节点。
介绍完 WatchedEvent,我们发现控制台只输出了一次 WatchedEvent,也就是说 CuratorWatcher 只进行了一次监听。如果想要重复使用我们需要重新使用 usingWatcher 进行注册。那么有没有不需要重复注册的监听呢?接下来我们就来介绍 Curator 一种功能强大的监听 CuratorCacheListener。
2.2 CuratorCacheListener
CuratorCacheListener 是基于 CuratorCache 缓存实现的监听器,CuratorCache 对 ZooKeeper 事件监听进行了封装,能够自动处理反复注册监听。我们使用 CuratorCacheListener 时,需要使用构建器 CuratorCacheListenerBuilder 来对具体的事件监听进行构建,并且把 CuratorCacheListener 注册到 CuratorCache 缓存中。
首先我们需要构建 CuratorCache 缓存实例,在 CuratorCache 接口中,build 为静态方法,我们可以直接调用:
// 构建 CuratorCache 缓存实例
static CuratorCache build(CuratorFramework client, String path, CuratorCache.Options... options) {
return builder(client, path).withOptions(options).build();
}
我们来说明以下入参:CuratorFramework client
是 Curator 客户端;String path
是需要被监听的节点的路径;CuratorCache.Options... options
是对缓存设置的参数,我们可以设置以下 3 种:
public static enum Options {
// 单节点缓存
SINGLE_NODE_CACHE,
// 对数据进行压缩
COMPRESSED_DATA,
// CuratorCache 关闭后不清除缓存
DO_NOT_CLEAR_ON_CLOSE;
}
构建完缓存实例,我们再来构建 CuratorCacheListener ,在 CuratorCacheListener 接口中的构建方法 builder 为静态方法,我们可以直接调用:
// builder 方法,返回 CuratorCacheListenerBuilder 构建器,我们就可以使用具体的监听方法了
static CuratorCacheListenerBuilder builder() {
return new CuratorCacheListenerBuilderImpl();
}
最后我们需要把 CuratorCacheListener 注册到 CuratorCache 中,并开启缓存:
// 注册 CuratorCacheListener
cache.listenable().addListener(listener);
// 开启缓存
cache.start();
我们来看一个完整的例子:
@Test
void contextLoads() throws Exception {
// 获取客户端
CuratorFramework client = curatorService.getCuratorClient();
// 开启会话
client.start();
// 构建 CuratorCache 实例
CuratorCache cache = CuratorCache.build(client, "/mooc");
// 使用 Fluent 风格和 lambda 表达式来构建 CuratorCacheListener 的事件监听
CuratorCacheListener listener = CuratorCacheListener.builder()
// 开启对所有事件的监听
// type 事件类型:NODE_CREATED, NODE_CHANGED, NODE_DELETED;
// oldNode 原节点:ChildData 类,包括节点路径,节点状态 Stat,节点 data
// newNode 新节点:同上
.forAll((type, oldNode, newNode) -> {
System.out.println("forAll 事件类型:" + type);
System.out.println("forAll 原节点:" + oldNode);
System.out.println("forAll 新节点:" + newNode);
})
// 开启对节点创建事件的监听
.forCreates(childData -> {
System.out.println("forCreates 新节点:" + childData);
})
// 开启对节点更新事件的监听
.forChanges((oldNode, newNode) -> {
System.out.println("forChanges 原节点:" + oldNode);
System.out.println("forChanges 新节点:" + newNode);
})
// 开启对节点删除事件的监听
.forDeletes(oldNode -> {
System.out.println("forDeletes 原节点:" + oldNode);
})
// 初始化
.forInitialized(() -> {
System.out.println("forInitialized 初始化");
})
// 构建
.build();
// 注册 CuratorCacheListener 到 CuratorCache
cache.listenable().addListener(listener);
// CuratorCache 开启缓存
cache.start();
// mooc 节点创建
client.create().forPath("/mooc");
// mooc 节点更新
client.setData().forPath("/mooc","Wiki".getBytes());
// mooc 节点删除
client.delete().forPath("/mooc");
}
我们来查看 CuratorCacheListenerBuilder 接口中具体的事件监听,我们需要监听哪种事件就使用哪种方法:
public interface CuratorCacheListenerBuilder {
// 全部事件
CuratorCacheListenerBuilder forAll(CuratorCacheListener var1);
// 创建事件
CuratorCacheListenerBuilder forCreates(Consumer<ChildData> var1);
// 更新事件
CuratorCacheListenerBuilder forChanges(CuratorCacheListenerBuilder.ChangeListener var1);
// 创建和更新事件
CuratorCacheListenerBuilder forCreatesAndChanges(CuratorCacheListenerBuilder.ChangeListener var1);
// 删除事件
CuratorCacheListenerBuilder forDeletes(Consumer<ChildData> var1);
// 初始化后开启线程异步执行
CuratorCacheListenerBuilder forInitialized(Runnable var1);
// 子节点的事件
CuratorCacheListenerBuilder forPathChildrenCache(String var1, CuratorFramework var2, PathChildrenCacheListener var3);
// 节点本身的事件和子节点的事件
CuratorCacheListenerBuilder forTreeCache(CuratorFramework var1, TreeCacheListener var2);
// 节点本身的事件
CuratorCacheListenerBuilder forNodeCache(NodeCacheListener var1);
// 初始化后开启监听
CuratorCacheListenerBuilder afterInitialized();
// 构建方法
CuratorCacheListener build();
/*
* 更新事件时被调用
*/
@FunctionalInterface
public interface ChangeListener {
void event(ChildData var1, ChildData var2);
}
}
接下来我们执行测试方法,查看控制台输出:
forInitialized 初始化
forAll 事件类型:NODE_CREATED
forAll 原节点:null
forAll 新节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forCreates 新节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 事件类型:NODE_CHANGED
forAll 原节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forAll 新节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forChanges 原节点:ChildData{path='/mooc', stat=2760,2760,1598451457977,1598451457977,0,0,0,0,13,0,2760
, data=[49, 57, 50, 46, 49, 54, 56, 46, 48, 46, 49, 48, 53]}
forChanges 新节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 事件类型:NODE_DELETED
forAll 原节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
forAll 新节点:null
forDeletes 原节点:ChildData{path='/mooc', stat=2760,2761,1598451457977,1598451457984,1,0,0,0,4,0,2760
, data=[87, 105, 107, 105]}
我们发现,我们设置的 create,setData,delete 这 3 种事件都被监听到了,而且 forAll 每一种事件都监听到了,所以我们在使用的时候,选择我们需要的事件监听即可。
介绍完 CuratorCacheListener 监听器,并完成了事件监听的测试,那么 Zookeeper 的 Watch 是如何运行的呢?接下来我们就来介绍 Watch 的运行原理。
3. Watch 的原理
在介绍 Watch 的原理之前,我们先熟悉一个概念:Zookeeper 客户端对 Znode 的写操作,也就是新增节点、更新节点、删除节点这些操作,默认会开启监听;Zookeeper 客户端对 Znode 的读操作,也就是查询节点数据、查询节点是否存在、查询子节点等操作,需要手动设置开启监听。这也是为什么在 GetDataRequest 请求体中会有 watch 这个属性的原因。
Watch 的运行过程分为 4 部分,分别是:客户端注册 Watch 、服务端注册 Watch、服务端触发 Watch、客户端处理回调。
-
客户端注册 Watch
当我们使用 Zookeeper 客户端向 Zookeeper 服务端发送带有事件监听的请求时,Zookeeper 客户端会把该请求标记成带有 Watch 的请求,然后把 Watch 监听器注册到 ListenerManager 中。 -
服务端注册 Watch
Zookeeper 服务端接收到 Zookeeper 客户端发送过来的请求,解析请求体,判断该请求是否带有 Watch 事件,如果有 Watch 事件,就会把 Watch 事件注册到 WatchManager 中。 -
服务端触发 Watch
Zookeeper 服务端注册完 Watch 事件后,会调用 WatchManager 的 triggerWatch 方法来触发 Watch 事件,Watch 事件完成后,向客户端发送响应。 -
客户端处理回调
Zookeeper 客户端接收到 Zookeeper 服务端的响应后,解析响应体,根据响应体的类型去 ListenerManager 中查找相对应的 Watch 监听器,然后触发监听器的回调函数。
4. 总结
在本节中,我们学习了使用 Curator 的两种方式开启对事件的监听,也了解了 Watch 运行过程的 4 个部分。以下是本节内容的总结:
- Zookeeper Watch 的实现。
- Zookeeper Watch 的原理。