1. zookeeper简介及作用
zookeeper是hadoop生态圈中的一个组件。本身是一种文件系统,结合监听通知机制来实现管理功能注册中心配置几种管理集群管理分布式锁队列管理
2. zookeeper架构
类似于linux文件系统,有一个根目录但每个节点不是目录,而是一个znode每个znode都可以存储数据
3. znode类型
持久节点持久有序节点 (队列实现)临时节点(断开连接就删除)临时有序节点 (分布式锁实现)
4. 监听通知机制
客户端监听zookeeper节点,当zookeeper节点改变时,会通知客户端。
5. 常用命令
ls 节点名称: 查询当前节点下的所有节点get 节点名称: 查看节点下的数据create 节点名称 数据 : 创建节点
create -s : -s创建有序节点create -e : -e创建临时节点 set 节点名称 新数据: 修改数据delete 节点名称: 删除没有子节点的节点rmr 节点名称: 删除节点及所有子节点
6. zookeeper集群特点
集群中必须master节点master可以执行读写操作,slave只进行读操作mater挂掉时,投票选举新的master
7. 节点角色
Leader:master主节点Follower: 从节点,参与选举新的LeaderObserver: 从节点,不参与投票Looking: 正在找Leader节点
8. 选举策略
每一个zookeeper服务都会被分配一个全局唯一的myid,zookeeper会给每一个数据分配一个全局位移的zxid,数据越新zxid越大。选举策略
选举zxid最大的节点作为leader。zxid相同的节点中,选举myid最大的节点作为leader。
9. java连接zookeeper
public class ZkUtil {
public static CuratorFramework
cf(){
Retrypolicy retryPolicy
= new ExponentialBackoffRetry(3000, 2);
CuratorFramework cf
= CuratorFrameworkFactory
.builder().connectString("192.168.199.209:2181,192.168.168.199.209:2182").retryPolicy(retryPolic
).build();
cf
.start();
return cf
;
}
}
10. java 操作 zookeeper
查询操作
public class Demo{
CuratorFramework cf
= ZkUtl
.cf();
public void selectNode() throw Exception
{
List
<String> strings
= cf
.getChildren().forPath("/");
for(String string
: strings
){
System
.out
.println(string
);
}
}
public void getData() throw Exception
{
byte[] bytes
= cf
.getData
.forPath("/xxx");
System
.out
.println(new String(bytes
,"UTF-8"));
}
}
添加操作
public void create(){
cf
.create
.withMode(CreateMode
.PERSISTENT
).forPath("/demo","aaa".getBytes());
}
修改操作
public void update(){
cf
.setData().forPath("/demo","bbb".getBytes());
}
删除操作
public void delete(){
cf
.delete().deletingChildrenIfNeeded().forPath("/demo");
}
查看状态
public void status(){
cf
.checkExists().forPath("/demo");
}
11. java实现监听通知
public class Demo{
CuratorFramework cf
= ZkUtil
.cf();
public void listen(){
NodeCache nodeCache
= new NodeCache(cf
,"/demo");
nodeCache
.start();
nodeCache
.getListenable().addListener(new NodeCacheListener(){
@override
public void nodeChanged() throw Exception
{
byte[] data
= nodeCache
.getCurrentData().getData();
Stat stat
= nodeCache
.getCurrentData
.getStat();
String path
= nodeCache
.getCurrentData
.getPath();
System
.out
.println("监听的节点是:" + path
);
System
.out
.println("节点现在的数据:" + new String(data
, "UTF08"));
System
.out
.println("节点的状态:" + stat
);
}
});
System
.in
.read();
}
}