ZooKeeper Watcher事件监听机制

tech2026-04-11  1

文章目录

watcher架构注册watcher客户端与服务器的连接状态 检查节点exists方法getData方法getChildren方法

watcher架构

watcher实现由三个部分组成

zookeeper服务端zookeeper客户端客户端的ZKWatchManager对象 客户端首先将 Watcher注册到服务端,同时将 Watcher对象保存到客户端的watch管理器中。当Zookeeper服务端监听的数据状态发生变化时,服务端会主动通知客户端,接着客户端的 Watch管理器会**触发相关 Watcher**来回调相应处理逻辑,从而完成整体的数据 发布/订阅流程

特性说明一次性watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册客户端顺序回调watcher回调是顺序串行执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响别的watcher执行轻量级WatchEvent是最小的通信单位,结构上只包含通知状态、事件类型和节点路径,并不会告诉数据节点变化前后的具体内容时效性watcher只有在当前session彻底失效时才会无效,若在session有效期内快速重连成功,则watcher依然存在,仍可接收到通知;

注册watcher

客户端与服务器的连接状态

KeeperState:通知状态SyncConnected:客户端与服务器正常连接时Disconnected:客户端与服务器断开连接时Expired:会话session失效时AuthFailed:身份认证失败时事件类型为:None public static ZooKeeper genZkConn(){ CountDownLatch count = new CountDownLatch(1); ZooKeeper zooKeeper = null; try { zooKeeper = new ZooKeeper("hadoop102:2181",5000,(WatchedEvent watchedEvent)->{ if (watchedEvent.getState()== Watcher.Event.KeeperState.SyncConnected){ System.out.println("Zookeeper连接成功"); count.countDown(); } }); } catch (IOException e) { e.printStackTrace(); } return zooKeeper; }

检查节点

exists方法

检查节点是否存在,第一个参数表示节点路径,第二个参数表示是否监听这个节点。同一个节点可以注册多个监听器。

exists(String path, boolean b) exists(String path, Watcher w)

getType()获取的值:

NodeCreated:节点创建NodeDeleted:节点删除NodeDataChanged:节点内容 @Test public void watcherExisted1() throws Exception{ zooKeeper = ZooKeeperUtils.genZkConn(); zooKeeper.exists("/watcher/node1",true); Thread.sleep(100000); }

监测到节点被删除,但是Watcher是一次注册一次监测,监测完成后不能继续监测。

@Test public void watcherExisted2() throws Exception{ zooKeeper = ZooKeeperUtils.genZkConn(); zooKeeper.exists("/watcher/node2", new Watcher() { @Override public void process(WatchedEvent event) { System.out.println("自定义Watcher"); System.out.println("Path = " + event.getPath()); System.out.println("EventType = "+event.getType()); try { zooKeeper.exists("/watcher/node2",this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread.sleep(100000); }

在precoss方法中再次调用watcher可以做到持续监测。


getData方法

getData(String path, boolean b, Stat stat) getData(String path, Watcher w, Stat stat)

getType()获取的值:

NodeDeleted:节点删除NodeDataChange:节点内容发生变化

getChildren方法

getChildren(String path, boolean b) getChildren(String path, Watcher w)

getType()获取的值:

NodeChildrenChanged:子节点发生变化NodeDeleted:节点删除
最新回复(0)