zookeeper笔记+源码刨析

tech2023-06-16  106

会不断更新!冲冲冲!跳转连接

https://blog.csdn.net/qq_35349982/category_10317485.html

zookeeper

1.介绍

Zookeeper 分布式数据一致性的解决方案,分布式应用程序可以基于他实现诸如数据订阅/发布,负载均衡,命名服务,集群管理,分布式锁,分布式队列

2.安装篇

2.1.安装单机版

1.下载

cd /usr/local/src #进入指定目录 #下载zookeeper wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

2.解压

tar -zxvf zookeeper-3.4.13.tar.gz cd zookeeper-3.4.13

3.修改配置文件

#进配置文件 cd zookeeper-3.4.13/conf #将zoo_sample.cfg这个文件复制为zoo.cfg (必须是这个文件名) cp zoo_sample.cfg zoo.cfg #新增data跟log目录 cd /usr/local/src/zookeeper/zookeeper-3.4.9 mkdir data mkdir log #进入zoo.cfg文件进行编辑 vi zoo.cfg #修改dataDir 新增dataLogDir dataDir=/usr/local/src/zookeeper/data dataLogDir=/usr/local/src/zookeeper/log

4.配置环境变量

export ZOOKEEPER_INSTALL=/usr/local/src/zookeeper export PATH=$PATH:$ZOOKEEPER_INSTALL/bin

5.启动

cd /usr/local/src/zookeeper/bin ./zkServer.sh start

2.2 安装集群版

1.改名称

mv zookeeper-3.4.14 zookeeper01

2.复制多分

cp -r zookeeper01/ zookeeper02 cp -r zookeeper01/ zookeeper03

3.修改配置文件

#创建两个文件夹 mkdir data mkdir logs #修改配置文件名称 cd conf mv zoo_sample.cfg zoo.cfg

4.编写配置文件 (修改三个配置文件)

clientPort=2181 dataDir=/usr/local/src/zookeeper2/data dataLogDir=/usr/local/src/zookeeper2/logs

5.修改集群配置文件

先查看IP ( 不可以使用服务器IP)

ifconfig

在每个zookeeper文件的data目录下创建一个myid文件,内容分别为1.2.3,这个文件就是记录每个服务器的ID

touch myid

在每个zookeeper中的zoo.cfg文件中,配置集群服务器Ip

server.1=172.17.153.160:2881:3881 server.2=172.17.153.160:2882:3882 server.3=172.17.153.160:2883:3883 #server.服务器ID=服务器IP地址:服务器之间通信端⼝:服务器之间投票选举端⼝ jps #里面有6456 QuorumPeerMain 代表启动成功

6.启动

./zkServer.sh start #启动 ./zkServer.sh status #查看状态 ./zkServer.sh stop #停止

安装的问题总结

1.停止8080端口

netstat -nltp | grep 2181 kill -9 3027

2.注意开放的端口,以及防火墙的问题

3.查询日志

./zkServer.sh start-foreground #查询日志

4.在log中有一个 out文件,看下里面的报错信息

5.端口被占用后,把dara和logs中的旧文件全部删掉

https://blog.csdn.net/Hello_World_QWP/article/details/90765608?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param

https://blog.csdn.net/qq_36651243/article/details/89396618

https://www.jianshu.com/p/0335f1f41420

https://www.cnblogs.com/zimo-jing/p/9037853.html

https://blog.csdn.net/weixin_45793065/article/details/106709479

3.基本命令

3.1 命令行

1.1.创建节点

#启动 ./zkCli.sh #查询当前节点 ls / #创建顺序节点 create -s /zk-test 123 #创建临时节点(客户端重启,节点关闭) create -e /zk-temp 123 #创建永久节点 create /zk-permanent 123

1.2.读取节点

#获取节点 并查看信息 get /zk-permanent

1.3.更新 删除节点

get /zk-permanent #更新 set /zk-permanent 456 #删除 (若该节点存在子节点则无法删除,需删除子节点) delete /zk-permanent

3.2 ZooKeeper类

<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.2</version> </dependency>

2.1连接并创建

public class CreateNote implements Watcher { private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; /* 建立会话 */ public static void main(String[] args) throws IOException, InterruptedException, KeeperException { /* 客户端可以通过创建一个zk实例来连接zk服务器 new Zookeeper(connectString,sesssionTimeOut,Wather) connectString: 连接地址:IP:端口 sesssionTimeOut:会话超时时间:单位毫秒 Wather:监听器(当特定事件触发监听时,zk会通过watcher通知到客户端) */ zooKeeper = new ZooKeeper("47.95.1.96:2181", 5000, new CreateNote()); System.out.println(zooKeeper.getState()); // 计数工具类:CountDownLatch:不让main方法结束,让线程处于等待阻塞 //countDownLatch.await();\ Thread.sleep(Integer.MAX_VALUE); } /* 回调方法:处理来自服务器端的watcher通知 */ public void process(WatchedEvent watchedEvent) { // SyncConnected if(watchedEvent.getState() == Event.KeeperState.SyncConnected){ //解除主程序在CountDownLatch上的等待阻塞 System.out.println("process方法执行了..."); // 创建节点 try { createNoteSync(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } /* 创建节点的方法 */ private static void createNoteSync() throws KeeperException, InterruptedException { /** * path :节点创建的路径 * data[] :节点创建要保存的数据,是个byte类型的 * acl :节点创建的权限信息(4种类型) * ANYONE_ID_UNSAFE : 表示任何人 * AUTH_IDS :此ID仅可用于设置ACL。它将被客户机验证的ID替换。 * OPEN_ACL_UNSAFE :这是一个完全开放的ACL(常用)--> world:anyone * CREATOR_ALL_ACL :此ACL授予创建者身份验证ID的所有权限 * createMode :创建节点的类型(4种类型) * PERSISTENT:持久节点 * PERSISTENT_SEQUENTIAL:持久顺序节点 * EPHEMERAL:临时节点 * EPHEMERAL_SEQUENTIAL:临时顺序节点 String node = zookeeper.create(path,data,acl,createMode); */ String note_persistent = zooKeeper.create("/my-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("节点创建完成"); // // 持久节点 // String note_persistent = zooKeeper.create("/lg-persistent", "持久节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // // // 临时节点 // String note_ephemeral = zooKeeper.create("/lg-ephemeral", "临时节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // // // 持久顺序节点 // String note_persistent_sequential = zooKeeper.create("/lg-persistent_sequential", "持久顺序节点内容".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL); // // System.out.println("创建的持久节点" + note_persistent); // System.out.println("创建的临时节点" + note_ephemeral); // System.out.println("创建的持久顺序节点" + note_persistent_sequential); } }

2.2查询,更新

/* 更新数据节点内容的方法 */ private void updateNoteSync() throws KeeperException, InterruptedException { /* path:路径 data:要修改的内容 byte[] version:为-1,表示对最新版本的数据进行修改 zooKeeper.setData(path, data,version); */ //新建 Stat stat = zooKeeper.setData("/my-persistent", "我新建了一个节点".getBytes(), -1); //查询 byte[] data = zooKeeper.getData("/my-persistent", false, null); System.out.println("修改前的值:" + new String(data)); //修改 Stat stat1 = zooKeeper.setData("/my-persistent", "我修改了我新建的节点".getBytes(), -1); //查询 byte[] data2 = zooKeeper.getData("/my-persistent", false, null); System.out.println("修改后的值:" + new String(data2)); }

2.3删除

/* 删除节点的方法 */ private void deleteNoteSync() throws KeeperException, InterruptedException { /* zooKeeper.exists(path,watch) :判断节点是否存在 zookeeper.delete(path,version) : 删除节点 */ Stat stat = zooKeeper.exists("/lg-persistent/c1", false); System.out.println(stat == null ? "该节点不存在":"该节点存在"); if(stat != null){ zooKeeper.delete("/lg-persistent/c1",-1); } Stat stat2 = zooKeeper.exists("/lg-persistent/c1", false); System.out.println(stat2 == null ? "该节点不存在":"该节点存在"); }

3.3 curator

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency>

3.1连接

//不使用fluent编程风格 RetryPolicy exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3); // 使用fluent编程风格 CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("47.95.1.96:2181") .sessionTimeoutMs(50000) .connectionTimeoutMs(30000) .retryPolicy(exponentialBackoffRetry) .namespace("base") // 独立的命名空间 /base .build(); client.start(); System.out.println("会话2创建了"); // 创建节点 String path = "/lg-curator/c1"; String s = client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes()); System.out.println("节点递归创建成功,该节点路径" + s);

3.2新增,获取状态信息

// 创建节点 String path = "/lg-curator/c1"; String s = client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes()); System.out.println("节点递归创建成功,该节点路径" + s); // 获取节点的数据内容及状态信息 // 数据内容 byte[] bytes = client.getData().forPath(path); System.out.println("获取到的节点数据内容:" + new String(bytes)); // 状态信息 Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath(path); System.out.println("获取到的节点状态信息:" + stat );

3.3更新

// 更新节点内容 //1 int version = client.setData().withVersion(stat.getVersion()).forPath(path, "修改内容1".getBytes()).getVersion(); System.out.println("当前的最新版本是" + version); byte[] bytes2 = client.getData().forPath(path); System.out.println("修改后的节点数据内容:" + new String(bytes2)); // BadVersionException client.setData().withVersion(stat.getVersion()).forPath(path,"修改内容2".getBytes());

3.4删除

// 删除节点 String path = "/lg-curator"; client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path); System.out.println("删除成功,删除的节点" + path);

3.5 三种监听

NodeCache: 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作。

PathChildrenCache: 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听

TreeCache: 可以将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647(int类型的最大值)

1.PathChildrenCache实现

//PathChildrenCache的使用 //创建监听事件 PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, "/serviceList", false);//监听路径下的所有节点 pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//异步初始化 pathChildrenCache.getListenable().addListener(new CilentListener()); //==================== public class CilentListener implements PathChildrenCacheListener { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { System.out.println(pathChildrenCacheEvent.getData().getPath()); } if (type.equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { System.out.println("新增"+pathChildrenCacheEvent.getData().getPath()); //获取新增的节点地址 String path = pathChildrenCacheEvent.getData().getPath(); String[] split = path.split("/"); //获取服务器的地址 String serviceValue = CuratorUtils.findServiceRegister(pathChildrenCacheEvent.getData().getPath()); //添加元素 Map<String, String> serverAddressMap = ConsumerBoot.serverAddressMap; serverAddressMap.put(split[split.length-1],serviceValue); } if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { String path = pathChildrenCacheEvent.getData().getPath(); String[] split = path.split("/"); System.out.println("移除"+pathChildrenCacheEvent.getData().getPath()); //在Map中移除节点 Map<String, String> serverAddressMap = ConsumerBoot.serverAddressMap; serverAddressMap.remove(split[split.length-1]); System.out.println("map中的数量"+serverAddressMap.size()); } } }

工具类

package com.lagou.zookeeper; import com.lagou.ZKConstant; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; import java.nio.ByteBuffer; import java.util.*; public class CuratorUtils { private static String serverList = "serviceList"; private static int serverNum = 0; public static CuratorFramework build(){ CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() //.connectString("47.95.1.96:2181") .connectString("127.0.0.1:2181") .namespace(ZKConstant.ZK_NAMESPACE) //.connectionTimeoutMs(15000) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .build(); curatorFramework.start(); return curatorFramework; } /** * 服务注册 * @throws Exception */ public static CuratorFramework serviceRegister(String address,int port) throws Exception { //创建连接 CuratorFramework client= CuratorUtils.build(); //创建临时节点,拼接ip+端口 String serviceAddress = address+":"+port; Stat s = client.checkExists().forPath("/"+serverList); if (s == null) { //根节点 client.create().withMode(CreateMode.PERSISTENT).forPath("/"+serverList); } client.create().creatingParentContainersIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath("/"+serverList+"/"+String.valueOf(serverNum+1),serviceAddress.getBytes()); return client; } /** * 服务列表 * @return * @throws Exception */ public static List<String> findServiceRegList() throws Exception { //创建连接 CuratorFramework client= CuratorUtils.build(); List<String> nodeList = client.getChildren().forPath("/" + serverList); return nodeList; } public static void deleteServiceRegister(String path) throws Exception { //创建连接 CuratorFramework client= CuratorUtils.build(); client.delete().forPath(path); // return nodeList; } //获取内容 public static String findServiceRegister(String path) throws Exception { //创建连接 CuratorFramework client= CuratorUtils.build(); byte[] bytes = client.getData().forPath(path); return new String(bytes); } /** * 新增时间响应 * @param path * @param dateStr * @throws Exception */ public static void addServiceTime(String path,String dateStr) throws Exception { //创建连接 CuratorFramework client= CuratorUtils.build(); if(client.checkExists().forPath(path)==null){ client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,dateStr.getBytes()); }else{ client.setData().forPath(path,dateStr.getBytes()); } } public static void addReponseTime(String path,String dateStr) throws Exception { //创建连接 CuratorFramework client= CuratorUtils.build(); if(client.checkExists().forPath(path)==null){ client.create().creatingParentContainersIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path,dateStr.getBytes()); }else{ client.setData().forPath(path,dateStr.getBytes()); } } public static byte[] longToBytes(long x) { ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); buffer.putLong(x); return buffer.array(); } /** * 求Map<K,V>中Value(值)的最小值 * @param map * @return */ public static Object getMinValue(Map<String, Integer> map) { if (map == null) return null; Collection<Integer> c = map.values(); Object[] obj = c.toArray(); Arrays.sort(obj); return obj[0]; } public static void main(String[] args) throws Exception{ int numbers[] ={1,2,5}; Random random = new Random(); int i = random.nextInt(numbers.length); System.out.println(i); System.out.println(numbers[i]); // findServiceRegList(); // HashMap<String, Integer> stringStringHashMap = new HashMap<>(); // stringStringHashMap.put("8999",90 ); // stringStringHashMap.put("8998",80 ); // stringStringHashMap.put("8997",70 ); // stringStringHashMap.put("8996",70 ); // // List<Integer> serverServiceList = new ArrayList<Integer>(stringStringHashMap.values()); // List<String> serverKeyList = new ArrayList<String>(stringStringHashMap.keySet()); // System.out.println(serverServiceList); // try { // //注册服务 // CuratorFramework curatorFramework = serviceRegister("127.0.0.1", 8999); // //创建监听 // PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, "/serviceList", false);//监听msg_server_list路径下的所有节点 // pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//异步初始化 // pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { // @Override // public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { // PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); // if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // System.out.println(pathChildrenCacheEvent.getData().getPath()); // } // // if (type.equals(PathChildrenCacheEvent.Type.INITIALIZED)) { // System.out.println("新增"+pathChildrenCacheEvent.getData().getPath()); // } // if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // System.out.println("移除"+pathChildrenCacheEvent.getData().getPath()); // } // } // }); // Thread.sleep(Long.MAX_VALUE); // String testPath="pathChildrenCacheTest"; // //创建连接 // CuratorFramework client= CacheListenerUtils.build(); // //如果testPath存在,删除路径 // Stat stat = client.checkExists().forPath("/"+testPath); // if(stat != null) // { // client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath); // } // //创建testPath // client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath,testPath.getBytes()); // // //创建PathChildrenCache // //参数:true代表缓存数据到本地 // PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/" + testPath,true); // //BUILD_INITIAL_CACHE 代表使用同步的方式进行缓存初始化。 // pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); // pathChildrenCache.getListenable().addListener((cf, event) -> { // PathChildrenCacheEvent.Type eventType = event.getType(); // switch (eventType) { // case CONNECTION_RECONNECTED: // pathChildrenCache.rebuild(); // break; // case CONNECTION_SUSPENDED: // break; // case CONNECTION_LOST: // System.out.println("Connection lost"); // break; // case CHILD_ADDED: // System.out.println("Child added"); // break; // case CHILD_UPDATED: // System.out.println("Child updated"); // break; // case CHILD_REMOVED: // System.out.println("Child removed"); // break; // default: // } // }); // // //创建子节点1 // client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/1",testPath.getBytes()); // Thread.sleep(1000); // //创建子节点1 // client.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/"+testPath+"/2",testPath.getBytes()); // Thread.sleep(1000); // //删除子节点1 // client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/1"); // Thread.sleep(1000); // //删除子节点2 // client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/"+testPath+"/2"); // Thread.sleep(1000); // // // // // pathChildrenCache.close(); // } catch (Exception e) { // e.printStackTrace(); // // TODO: handle exception // } } } -Dlog4j.configuration=file:C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\log4j.properties

4.应用场景

1.数据的发布与订阅

发布与订阅即所谓的配置中心

推(Push)

拉(Pull)

具备三个特征

数据量比较小数据会动态发生变化集群中各机器共享,配置一致

配置获取

配置变更

2.命名服务

Zookeeper节点创建的API接口创建顺序节点,并且返回值中会返回这个节点的完整名字,来生成全局唯一的ID

集群管理

Master选举

分布式锁

分布式队列

FIFO先入先出分布式屏障

3.集群管理

利用临时节点,判断服务器的断开与连接,监听。来管理集群

分布式日志收集系统

4.Master选举

Master一般协调集群中的其他系统单元,具有对分布式系统状态变更的绝对权。

读写分离的产经,客户端的写请求往往是由Master来处理Master负责处理一些复杂的逻辑,同步给其他单元

5.分布式锁

排他锁

事务T1对数据对象O1加上了排他锁,枷锁期间,只允许事务T1对O1进行读取和更新操作,加锁期间,其他事务不能对这个数据对象进行任何类型的操作

共享锁(读锁)

事务T1对数据对象O1加上了共享锁,则当前事务只能对O1进行读取事务,其他事务也只能对这个数据对象加共享锁

羊群效应

6.分布式队列

特征

ZAB协议需要确保那些已经在Leader服务器上提交的事务最终被所以服务器都提交ZAB协议需要确保丢弃那些只在Leader服务器上被提出的事务

FIFO先入先出

First Input First Output 先入先出

分布式屏障

等待队列元素聚集后同意安排处理执行的Barrier模型

5.Zookeeper原理

5.1 ZAB协议

Zookeeper Atomic Broadcast(ZAB,Zookeeper源自消息广播协议)

支持崩溃恢复的原子广播协议

cai用ZAB协议来实现分布式数据的一致性,主备模式的系统架构来保持个副本之间的数据一致性,表现形式就是 采用单一的主进程来接受并处理客户端的所有事务请求,然后采用事务Proposal的形式广播到所有的副本进程中

5.2两种模式

崩溃恢复与消息广播

5.3三种状态*

LOOKING :Leader选举阶段FOLLOWING:Follower服务器和Leader服务器LEADING : Leader服务器作为主进程领导状态

所有进程初始化状态都是LOOKING状态

5.4ZAB与Paxos的联系与区别

都存在一个类似于Leader进程的角色,负责协调多个Follower进程的运行Leader进程都会等待超过半数的Follower做出正确反馈后,才会提议进行提交在ZAB协议中,每个Proposal都包含一个epoch值,用来代表当前的Leader周期,在Paxos中同样存在这样一个标识

ZAB协议主要用于高可用的分布式数据主备系统

Paxos算法主要构件一个分布式的一致性状态机系统

5.4服务器的角色*

Leader

事务请求的唯一调度和处理者,保证集群事务集群内部各服务器的调度者

Follwer

处理客户端非事务性请求(读取数据),转发事务给Leader参与事务请求Proposal的投票参数Leader选举投票

Observer

提供非事务服务

不参与投票

6.源码刨析

1.搭建项目

在VM options中添加(log4j的文件目录)

-Dlog4j.configuration=file:C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\log4j.properties

在program arguments**( 配置zoo.cfg的目录)**

C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\zoo.cfg

https://www.cnblogs.com/heyonggang/p/12123991.html

2.server的创建流程

启动类 ZooKeeperServerMain

启动类 main.initializeAndRun(args) 注册jmx ManagedUtil.registerLog4jMBeans();

解析配置文件

config.parse(args[0]);

初始化日志

FileTxnSnapLog txnLog = null; txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); 初始化zkServer对象 final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); 创建ServerCnxnFactory对象=====ServerCnxnFactory是Zookeeper中的重要组件,负责处理客户端与服务器的连接 //初始化server端IO对象,默认是NIOServerCnxnFactory:Java原生NIO处理网络IO事件 cnxnFactory = ServerCnxnFactory.createFactory(); 初始化配置信息 //初始化配置信息 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); @Override public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException { if (secure) { throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn"); } configureSaslLogin(); maxClientCnxns = maxcc; //会话超时时间 sessionlessCnxnTimeout = Integer.getInteger( ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000); // We also use the sessionlessCnxnTimeout as expiring interval for // cnxnExpiryQueue. These don't need to be the same, but the expiring // interval passed into the ExpiryQueue() constructor below should be // less than or equal to the timeout. //过期队列 cnxnExpiryQueue = new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout); expirerThread = new ConnectionExpirerThread(); //根据CPU个数计算selector线程的数量 int numCores = Runtime.getRuntime().availableProcessors(); // 32 cores sweet spot seems to be 4 selector threads numSelectorThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_SELECTOR_THREADS, Math.max((int) Math.sqrt((float) numCores/2), 1)); if (numSelectorThreads < 1) { throw new IOException("numSelectorThreads must be at least 1"); } //计算woker线程的数量 numWorkerThreads = Integer.getInteger( ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores); //worker线程关闭时间 workerShutdownTimeoutMS = Long.getLong( ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000); LOG.info("Configuring NIO connection handler with " + (sessionlessCnxnTimeout/1000) + "s sessionless connection" + " timeout, " + numSelectorThreads + " selector thread(s), " + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and " + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes/1024) + " kB direct buffers."))); //初始化selector线程 for(int i=0; i<numSelectorThreads; ++i) { selectorThreads.add(new SelectorThread(i)); } this.ss = ServerSocketChannel.open(); ss.socket().setReuseAddress(true); LOG.info("binding to port " + addr); ss.socket().bind(addr); ss.configureBlocking(false); //初始化accept线程,这里看出accept线程只有一个,里面会注册监听ACCEPT事件 acceptThread = new AcceptThread(ss, addr, selectorThreads); } 启动服务 //启动服务:此方法除了启动ServerCnxnFactory,还会启动ZooKeeper cnxnFactory.startup(zkServer); package org.apache.zookeeper.server; //启动类 ZooKeeperServerMain main = new ZooKeeperServerMain(); // 解析单机模式的配置对象,并启动单机模式 protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { try { //注册jmx // JMX的全称为Java Management Extensions.是管理Java的一种扩展。 // 这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等 ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } // 创建服务配置对象 ServerConfig config = new ServerConfig(); //如果入参只有一个,则认为是配置文件的路径 if (args.length == 1) { // 解析配置文件 config.parse(args[0]); } else { // 参数有多个,解析参数 config.parse(args); } // 根据配置运行服务 runFromConfig(config); } public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args //初始化日志文件 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); // 初始化zkServer对象 final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); // 服务结束钩子,用于知道服务器错误或关闭状态更改。 final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler( new ZooKeeperServerShutdownHandler(shutdownLatch)); // Start Admin server // 创建admin服务,用于接收请求(创建jetty服务) adminServer = AdminServerFactory.createAdminServer(); // 设置zookeeper服务 adminServer.setZooKeeperServer(zkServer); // AdminServer是3.5.0之后支持的特性,启动了一个jettyserver,默认端口是8080,访问此端口可以获取Zookeeper运行时的相关信息 adminServer.start(); boolean needStartZKServer = true; //---启动ZooKeeperServer //判断配置文件中 clientportAddress是否为null if (config.getClientPortAddress() != null) { //ServerCnxnFactory是Zookeeper中的重要组件,负责处理客户端与服务器的连接 //初始化server端IO对象,默认是NIOServerCnxnFactory:Java原生NIO处理网络IO事件 cnxnFactory = ServerCnxnFactory.createFactory(); //初始化配置信息 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); //启动服务:此方法除了启动ServerCnxnFactory,还会启动ZooKeeper cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } // 定时清除容器节点 //container ZNodes是3.6版本之后新增的节点类型,Container类型的节点会在它没有子节点时 // 被删除(新创建的Container节点除外),该类就是用来周期性的进行检查清理工作 containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000) ); containerManager.start(); // Watch status of ZooKeeper server. It will do a graceful shutdown // if the server is not running or hits an internal error. // ZooKeeperServerShutdownHandler处理逻辑,只有在服务运行不正常的情况下,才会往下执行 shutdownLatch.await(); // 关闭服务 shutdown(); if (cnxnFactory != null) { cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.canShutdown()) { zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } } }

3.Leader选举

概念

外部投票:特指其他服务器发来的投票内部投票:服务器自身当前的投票选举轮次:ZooKeeper服务器Leader选举的轮次,即logical clock(逻辑时钟)Pk: 指对内部投票和外部投票进行一个对比来确定是否需要变更内部投票sendqueue:选票发送队列:保存待发送的投票recvqueue:选票接收队列,用于保存接收到的外部投票

//====================

服务器启动时期的Leader选举

1)每个Server发出一个投票

2)接受来自给各个服务器的投票

3)处理投票

优先检查ZXID. ZXID比较大的服务器优先作为Leader如果ZXID相同,那么就比较myid。 myId较大的服务器作为

4)统计投票

5)改变服务器状态

服务器运行时期的Leader的选举

​ 1)变更状态

​ 2)每个Server会发出一个投票

​ 3) 接受来自各个服务器的投票,与启动时过程相同

​ 4)处理投票

​ 5)统计投票

​ 6)改变服务器的状态

public interface Election { //҅寻找Leader public Vote lookForLeader() throws InterruptedException; //停止 public void shutdown(); }

FastLeaderElection有三个重要的类

1.Notification

表示收到的选举投票信息(其他服务器发来的选举投票信息)

/* Notification表示收到的选举投票信息(其他服务器发来的选举投票信息), 其包含了被选举者的id、zxid、选举周期等信息, 其buildMsg方法将选举信息封装至ByteBuffer中再进行发送 */ static public class Notification { /* * Format version, introduced in 3.4.6 */ public final static int CURRENTVERSION = 0x2; int version; /* * Proposed leader * */ // 被推选的leader的id long leader; /* * zxid of the proposed leader */ // 被推选的leader的事务id long zxid; /* * Epoch */ // 推选者的选举周期 long electionEpoch; /* * current state of sender */ // 推选者的状态 QuorumPeer.ServerState state; /* * Address of sender */ // 推选者的id long sid; QuorumVerifier qv; /* * epoch of the proposed leader */ // 被推选者的选举周期 long peerEpoch; }

2.ToSend

表示发送给其他服务器的选举投票信息

/* ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的id、zxid、选举周期等信息 */ static public class ToSend { static enum mType {crequest, challenge, notification, ack} ToSend(mType type, long leader, long zxid, long electionEpoch, ServerState state, long sid, long peerEpoch, byte[] configData) { this.leader = leader; this.zxid = zxid; this.electionEpoch = electionEpoch; this.state = state; this.sid = sid; this.peerEpoch = peerEpoch; this.configData = configData; } /* * Proposed leader in the case of notification */ //被推举的leader的id long leader; /* * id contains the tag for acks, and zxid for notifications */ // 被推举的leader的最大事务id long zxid; /* * Epoch */ // 推举者的选举周期 long electionEpoch; /* * Current state; */ // 推举者的状态 QuorumPeer.ServerState state; /* * Address of recipient */ // 推举者的id long sid; /* * Used to send a QuorumVerifier (configuration info) */ byte[] configData = dummyData; /* * Leader epoch */ // 被推举的leader的选举周期 long peerEpoch; } LinkedBlockingQueue<ToSend> sendqueue; LinkedBlockingQueue<Notification> recvqueue;

3.Messenger

protected class Messenger { /** * Receives messages from instance of QuorumCnxManager on * method run(), and processes such messages. */ class WorkerReceiver extends ZooKeeperThread { //是否停止 volatile boolean stop; //服务器之间的连接 QuorumCnxManager manager; WorkerReceiver(QuorumCnxManager manager) { super("WorkerReceiver"); this.stop = false; this.manager = manager; } public void run() { //响应 Message response; while (!stop) {//不停止 // Sleeps on receive try { //从RecvQueue取一个选举投票信息(从其他服务器发射过来的) response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); //无投票则跳过 if(response == null) continue; // The current protocol and two previous generations all send at least 28 bytes if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } // this is the backwardCompatibility mode in place before ZK-107 // It is for a version of the protocol in which we didn't send peer epoch // With peer epoch and version the message became 40 bytes boolean backCompatibility28 = (response.buffer.capacity() == 28); // this is the backwardCompatibility mode for no version information boolean backCompatibility40 = (response.buffer.capacity() == 40); response.buffer.clear(); // Instantiate Notification and set its attributes Notification n = new Notification(); int rstate = response.buffer.getInt(); long rleader = response.buffer.getLong(); long rzxid = response.buffer.getLong(); long relectionEpoch = response.buffer.getLong(); long rpeerepoch; int version = 0x0; if (!backCompatibility28) { rpeerepoch = response.buffer.getLong(); if (!backCompatibility40) { /* * Version added in 3.4.6 */ version = response.buffer.getInt(); } else { LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid); } } else { LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid); rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid); } QuorumVerifier rqv = null; // check if we have a version that includes config. If so extract config info from message. if (version > 0x1) { int configLength = response.buffer.getInt(); byte b[] = new byte[configLength]; response.buffer.get(b); synchronized(self) { try { rqv = self.configFromString(new String(b)); QuorumVerifier curQV = self.getQuorumVerifier(); if (rqv.getVersion() > curQV.getVersion()) { LOG.info("{} Received version: {} my version: {}", self.getId(), Long.toHexString(rqv.getVersion()), Long.toHexString(self.getQuorumVerifier().getVersion())); if (self.getPeerState() == ServerState.LOOKING) { LOG.debug("Invoking processReconfig(), state: {}", self.getServerState()); self.processReconfig(rqv, null, null, false); if (!rqv.equals(curQV)) { LOG.info("restarting leader election"); self.shuttingDownLE = true; self.getElectionAlg().shutdown(); break; } } else { LOG.debug("Skip processReconfig(), state: {}", self.getServerState()); } } } catch (IOException e) { LOG.error("Something went wrong while processing config received from {}", response.sid); } catch (ConfigException e) { LOG.error("Something went wrong while processing config received from {}", response.sid); } } } else { LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid); } /* * If it is from a non-voting server (such as an observer or * a non-voting follower), respond right away. */ if(!validVoter(response.sid)) { //获取自己的投票 Vote current = self.getCurrentVote(); QuorumVerifier qv = self.getQuorumVerifier(); //构建对象 ToSend notmsg = new ToSend(ToSend.mType.notification, current.getId(), current.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); //放到队列等待发送 sendqueue.offer(notmsg); } else { //接受到服务器的选票信息 // Receive new message if (LOG.isDebugEnabled()) { LOG.debug("Receive new notification message. My id = " + self.getId()); } // State of peer that sent this message QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING; switch (rstate) {//读取状态 case 0: ackstate = QuorumPeer.ServerState.LOOKING; break; case 1: ackstate = QuorumPeer.ServerState.FOLLOWING; break; case 2: ackstate = QuorumPeer.ServerState.LEADING; break; case 3: ackstate = QuorumPeer.ServerState.OBSERVING; break; default: continue; } n.leader = rleader; n.zxid = rzxid; //获取选举周期 n.electionEpoch = relectionEpoch; n.state = ackstate; //设置服务器ID n.sid = response.sid; n.peerEpoch = rpeerepoch; n.version = version; n.qv = rqv; /* * Print notification info */ if(LOG.isInfoEnabled()){ printNotification(n); } /* * If this server is looking, then send proposed leader */ if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ recvqueue.offer(n); /* * Send a notification back if the peer that sent this * message is also looking and its logical clock is * lagging behind. */ if((ackstate == QuorumPeer.ServerState.LOOKING) && (n.electionEpoch < logicalclock.get())){ Vote v = getVote(); QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, v.getId(), v.getZxid(), logicalclock.get(), self.getPeerState(), response.sid, v.getPeerEpoch(), qv.toString().getBytes()); //将消息放到队列中,等待发送 sendqueue.offer(notmsg); } } else { /* * If this server is not looking, but the one that sent the ack * is looking, then send back what it believes to be the leader. */ //获取当前的投票 Vote current = self.getCurrentVote(); if(ackstate == QuorumPeer.ServerState.LOOKING){ if(LOG.isDebugEnabled()){ LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}", self.getId(), response.sid, Long.toHexString(current.getZxid()), current.getId(), Long.toHexString(self.getQuorumVerifier().getVersion())); } QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend( ToSend.mType.notification, current.getId(), current.getZxid(), current.getElectionEpoch(), self.getPeerState(), response.sid, current.getPeerEpoch(), qv.toString().getBytes()); //将消息放到队列中,等待发送 sendqueue.offer(notmsg); } } } } catch (InterruptedException e) { LOG.warn("Interrupted Exception while waiting for new message" + e.toString()); } } LOG.info("WorkerReceiver is down"); } } /** * This worker simply dequeues a message to send and * and queues it on the manager's queue. */ class WorkerSender extends ZooKeeperThread { //是否终止 volatile boolean stop; //服务器之间的连接 QuorumCnxManager manager; WorkerSender(QuorumCnxManager manager){ super("WorkerSender"); this.stop = false; this.manager = manager; } public void run() { while (!stop) { try { //sendqueue中获取消息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; //不为空则进行处理 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); } /** * Called by run() once there is a new message to send. * * @param m message to send */ void process(ToSend m) { //构建消息 ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData); //发送消息 manager.toSend(m.sid, requestBuffer); } } WorkerSender ws; WorkerReceiver wr; Thread wsThread = null; Thread wrThread = null; /** * Constructor of class Messenger. * * @param manager Connection manager */ Messenger(QuorumCnxManager manager) { //创建 WorkerSender this.ws = new WorkerSender(manager); //创建新线程 this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); //守护线程 this.wsThread.setDaemon(true); //创建WorkerReceiver this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); //设置守护线程 this.wrThread.setDaemon(true); } /** * Starts instances of WorkerSender and WorkerReceiver */ void start(){ this.wsThread.start(); this.wrThread.start(); } /** * Stops instances of WorkerSender and WorkerReceiver */ void halt(){ this.ws.stop = true; this.wr.stop = true; } }

类属性

//日志 private static final Logger LOG = LoggerFactory.getLogger(FastLeaderElection.class); /** * Determine how much time a process has to wait * once it believes that it has reached the end of * leader election. */ //完成leader election的等待时间 final static int finalizeWait = 200; /** * Upper bound on the amount of time between two consecutive * notification checks. This impacts the amount of time to get * the system up again after long partitions. Currently 60 seconds. */ //两个连续 通知之间的最大时间 final static int maxNotificationInterval = 60000; /** * This value is passed to the methods that check the quorum * majority of an established ensemble for those values that * should not be taken into account in the comparison * (electionEpoch and zxid). */ final static int IGNOREVALUE = -1; /** * Connection manager. Fast leader election uses TCP for * communication between peers, and QuorumCnxManager manages * such connections. */ //管理服务器之间的连接 QuorumCnxManager manager; // 选票发送队列,⽤于保存待发送的选票 LinkedBlockingQueue<ToSend> sendqueue; // 选票接收队列,⽤于保存接收到的外部投票 LinkedBlockingQueue<Notification> recvqueue; // 投票者 QuorumPeer self; Messenger messenger; // 逻辑时钟 volatile long logicalclock; /* Election instance */ // 推选的leader的id long proposedLeader; // 推选的leader的zxid long proposedZxid; // 推选的leader的选举周期 long proposedEpoch; // 是否停⽌选举 volatile boolean stop;

4.sendNotififications

private void sendNotifications() { //遍历投票参与者集合 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); //构建对象 ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //放到队列等待发送 sendqueue.offer(notmsg); } }

5.totalOrderPredicate

该函数将接受的投票与自身投票进行pk,查看是否消息中包含的服务器id是否更优,其按照epoch,zxid,id的优先级进行pk

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); //使用投票器看权重是否为0 if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId))))); }

6.termPredicate

判断leader选举是否结束,是否有一半以上的服务器选出了leader

private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { SyncedLearnerTracker voteSet = new SyncedLearnerTracker(); voteSet.addQuorumVerifier(self.getQuorumVerifier()); if (self.getLastSeenQuorumVerifier() != null && self.getLastSeenQuorumVerifier().getVersion() > self .getQuorumVerifier().getVersion()) { voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier()); } /* * First make the views consistent. Sometimes peers will have different * zxids for a server depending on timing. */ for (Map.Entry<Long, Vote> entry : votes.entrySet()) { / 将等于当前投票的项放⼊set if (vote.equals(entry.getValue())) { voteSet.addAck(entry.getKey()); } } //统计set,查看投某个id的票数是否超过⼀半 return voteSet.hasAllQuorums(); }

概念补充

1.zxid概念补充

ZooKeeper节点状态改变的每一个操作都将使节点接收到一个Zxid格式的时间戳,并且这个时间戳全局有序。也就是说,每个对节点的改变都将产生一个唯一的Zxid。

cZxid概念

对应为该节点的创建时间(Create)

mZxid概念

对应该节点的最近一次修改的时间(Mofify)

与其子节点无关

问题

jar包报错

要把项目中Build里面的jar包在项目中引入一下

作业的资料查找

https://segmentfault.com/a/1190000019670015

https://blog.csdn.net/sqh201030412/article/details/51446434?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-1&spm=1001.2101.3001.4242

单词

subscribe 订阅,捐款 Access Control Lists,ACL 访问控制列表

Latch 插销,锁

election 选举

Notification 通知

extract通知

Verifier 检验人

vote 投票

Quorum 大多数

Predicate 断言,暗示

Epoch 时代

Interval 间隔,区间

propos 关于评论

logical逻辑

Policy 政策

Retry 停止重试

Exponential 指数

guarantee保证

tribute 奉献,致敬

最新回复(0)