ZooKeeper Java API 使用(连接创建删除修改查看子节点)

tech2023-10-28  104

文章目录

Java 连接 zookeeper1.pom.xml2 操作连接zookeeper创建节点同步异步 更新节点删除节点查看节点/判断是否存在

Java 连接 zookeeper

1.pom.xml

加入zookeeper和zkClient依赖,zookeeper本身自带问题:

ZooKeeper的Watcher是一次性的,用过了需要再注册;session的超时后没有自动重连,生产环境中如果网络出现不稳定情况,那么这种情况出现的更加明显;没有领导选举机制,集群情况下可能需要实现stand by,一个服务挂了,另一个需要接替的效果;客户端只提供了存储byte数组的接口,而项目中一般都会使用对象。客户端接口需要处理的异常太多,并且通常,我们也不知道如何处理这些异常。

I0Itec这个zookeeper客户端基本上解决了上面的所有问题,主要有以下特性:

提供了zookeeper重连的特性——能够在断链的时候,重新建立连接,无论session失效与否.持久的event监听器机制—— ZKClient框架将事件重新定义分为了stateChanged、znodeChanged、dataChanged三种情况,用户可以注册这三种情况下的监听器(znodeChanged和dataChanged和路径有关),而不是注册Watcher。zookeeper异常处理——-zookeeper中繁多的Exception,以及每个Exception所需要关注的事情各有不同,I0Itec简单的做了封装。data序列化——简单的data序列化.(Serialzer/Deserialzer) 5)有默认的领导选举机制

但是zkClient需要重写几个方法:

create方法 : 创建节点时,如果节点已经存在,仍然抛出NodeExistException,可是我期望它不在抛出此异常。retryUtilConnected : 如果向zookeeper请求数据时(create,delete,setData等),此时链接不可用,那么调用者将会被阻塞直到链接建立成功;不过我仍然需要一些方法是非阻塞的,如果链接不可用,则抛出异常,或者直接返回。create方法 : 创建节点时,如果节点的父节点不存在,我期望同时也要创建父节点,而不是抛出异常。data监测: 我需要提供一个额外的功能来补充watch的不足,开启一个线程,间歇性的去zk server获取指定的path的data,并缓存起来。归因与watch可能丢失,以及它不能持续的反应znode数据的每一次变化,所以只能手动去同步获取。 <!--zookeeper 移除日志--> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.1</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!--zkclient 移除zookeeper 和日志--> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.11</version> <exclusions> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency>

2 操作

连接zookeeper

Zookeeper(String connectionString, int sessionTimeout, watcher watcher) connectionString - zookeeper主机sessionTimeout- 会话超时watcher - 实现"监听器" 对象。zookeeper集合通过监视器对象返回连接状态如果链接zookeeper集群,则在connectionString处输入多个ip和端口号,例如"10.211.55.8:2181,10.211.55.9:2181" public static void main(String[] args) throws IOException, InterruptedException { //计数器 CountDownLatch countDownLatch = new CountDownLatch(1); //zookeeper连接 ZooKeeper zooKeeper = new ZooKeeper("10.211.55.8:2181", 5000, (WatchedEvent watchedEvent) -> { //监视器 //当server 和 client建立连接成功,服务器主动向客户端发一则消息 if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected){ System.out.println("连接成功"); countDownLatch.countDown(); } }); //声明连接对象,并不一定就成功了,主线程阻塞等待 countDownLatch.await(); System.out.println(zooKeeper.getSessionId()); zooKeeper.close(); }

创建节点

同步
// 同步 create(String path, byte[] data, List<ACL> acl, CreateMode createMode) // 异步 create(String path, byte[] data, List<ACL> acl, CreateMode createMode, AsynCallback.StringCallback callBack, Object ctx)

第一个参数表示路径,第二个参数表示数据,第三个参数表示权限,第四个参数表示节点类型(持久化节点)

@Test public void testCreate(){ zooKeeper = ZooKeeperUtils.genZkConn(); //ZooDefs.Ids.OPEN_ACL_UNSAFE // CreateMode.PERSISTENT 表示持久化节点 try { //需要先有节点 /java 才能创建 zooKeeper.create("/java/node1","JavaNode".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } }

ephemeralOwner表示节点是持久化节点

第三个参数List<ACL> acl :Ids参数下包括OPEN_ACL_UNSAFE world:anyone:cdrwaCREATOR_ALL_ACL auth::cdrwaREAD_ACL_UNSAFE world:anyone:r。Perms参数:READ WRITE CREATE DELETE ADMIN ALL

自定义参数

第四个参数:CreateMode :CreateMode.PERSISTENT 持久化节点 CreateMode.PERSISTENT_SEQUENTIAL 持久化有序节点 CreateMode.EPHEMERAL 临时节点 @Test public void testCreateNode2() throws Exception { zooKeeper = ZooKeeperUtils.genZkConn(); String node3 = zooKeeper.create("/java/node3", "node3".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); System.out.println(node3); }

使用有序节点,返回当前节点名称

异步

同步创建时,创建完成后可以获得节点信息,但是异步创建,不一定可以直接得到节点信息,需要通过Create2Callback来中的processResult方法捕获参数

@Test public void testCreateNode3() throws Exception { zooKeeper = ZooKeeperUtils.genZkConn(); zooKeeper.create("/java/node4", "node4".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL , new AsyncCallback.Create2Callback() { //异步方式创建节点,信息通过本方法进行捕获 @Override public void processResult(int rc, String path, Object ctx, String name, Stat stat) { //0 表示创建成功 System.out.println(rc); System.out.println(path); System.out.println(name); //上下文参数 System.out.println(ctx); } },"I am context"); Thread.sleep(10000); System.out.println("结束"); }

更新节点

方法返回Stat 状态,可以查看节点的状态信息

// 同步 setData(String path, byte[] data, int version) // 异步 setData(String path, byte[] data, int version, StatCallback callBack, Object ctx) path:路径data:数据version:当前znode版本,数据更新时会更新版本号callBack:回调接口ctx:传递上下文参数

删除节点

// 同步 delete(String path, int version) // 异步 delete(String path, int version, AsyncCallback.VoidCallback callBack, Object ctx) path节点路径version版本callBack数据的版本号, -1代表不使用版本号,乐观锁机制ctx传递上下文参数

查看节点/判断是否存在

查看节点

// 同步 getData(String path, boolean watch, Stat stat) getData(String path, Watcher watcher, Stat stat) // 异步 getData(String path, boolean watch, DataCallback callBack, Object ctx) getData(String path, Watcher watcher, DataCallback callBack, Object ctx) 参数解释path节点路径boolean是否使用连接对象中注册的监听器stat元数据callBack异步回调接口,可以获得状态和数据ctx传递上下文参数

查看子节点

// 同步 getChildren(String path, boolean watch) getChildren(String path, Watcher watcher) getChildren(String path, boolean watch, Stat stat) getChildren(String path, Watcher watcher, Stat stat) // 异步 getChildren(String path, boolean watch, ChildrenCallback callBack, Object ctx) getChildren(String path, Watcher watcher, ChildrenCallback callBack, Object ctx) getChildren(String path, Watcher watcher, Children2Callback callBack, Object ctx) getChildren(String path, boolean watch, Children2Callback callBack, Object ctx)

判断是否存在

// 同步 exists(String path, boolean watch) exists(String path, Watcher watcher) // 异步 exists(String path, boolean watch, StatCallback cb, Object ctx) exists(String path, Watcher watcher, StatCallback cb, Object ctx)
最新回复(0)