Zookeeper的Leader 选举实现底层原理

tech2022-07-14  179

目录

1. zookeeper的集群角色2. zookeeper的事务流程3. ZAB协议3.1 原子广播实现原理3.2 奔溃恢复实现原理3.3 ZXID作用 4. leader 选举流程5. leader选举源码分析5.1 入口main方法5.1 初始化逻辑initializeAndRun方法5.3 集群下的runFromConfig方法5.4 调用start方法:5.4.1 载入本地数据 loadDataBase方法5.4.2 初始化选举算法 leaderElection5.4.3 选举算法的初始化 createElectionAlgorithm()方法5.4.3.1 FastLeaderElection初始化 5.4.4 super.start方法5.4.4.1 lookForLeader方法5.4.4.1.1 选票比较方法 totalOrderPredicate方法5.4.4.1.2 选票方法 termPredicate方法5.4.4.1.3 广播sendNotifications方法 5.4.4.2 选举过程 5.4.5 流程图

1. zookeeper的集群角色

leader leader是zookeeper集群的核心。 1 事务请求的唯一调度的矗立着,他需要保证事务处理的顺序性 2 集群内部各个服务器的调度者 follower 1 处理客户端非事务请求以及转发事务请求给leader服务器 2 参与事务请求提议的proposal投票 【客户端的一个事务请求需要半数服务投票通过才能通知leader commit,leader会发起一个提案,要求follower投票】 3 参与leader选举的投票 observer 1 观察zookeeper集群中最新状态的变化,并且把这些状态同步到observer服务器上。 2 增加observer不影响集群事务处理能力,同时能提升集群的非事务处理能力。

2. zookeeper的事务流程

zookeeper的集群组成 zookeeper一般是由2n+1台服务器组成

leader选举 1)leaderElection 2)AuthFastLeaderElection 3)FastLeaderElection serverID :配置server集群的时候给定服务器的标识id myid zxid:服务器它运行时产生的数据ID,zxid值越大,标识数据越新 Epoch:选举轮次 sid server的状态:Looking,Following,Observering,Leading leader选举 1)leaderElection 2)AuthFastLeaderElection 3)FastLeaderElection serverID :配置server集群的时候给定服务器的标识id myid zxid:服务器它运行时产生的数据ID,zxid值越大,标识数据越新 Epoch:选举轮次 sid server的状态:Looking,Following,Observering,Leading

如上图,在zookeeper中,客户端会随机向zookeeper中的某个几点发起请求,如果是读请求,就直接从当前节点中读取数据,如果是写请求,那么这个request会被转发给leader提交事务,然后leader会广播此事务,向所有的follower发起proposal,只要超过半数的返回ack即响应,那么leader就会commit这个写的事务。 通常 zookeeper 是由 2n+1 台 server 组成,每个 server 都知道彼此的存在。对于 2n+1 台 server,只要有 n+1 台(大多数)server 可用,整个系统保持可用。我们已经了解到,一个 zookeeper 集群如果要对外提供可用的服务,那么集群中必须要有过半的机器正常工作并且彼此之间能够正常通信,基于这个特性,如果向搭建一个能够允许 F 台机器down 掉的集群,那么就要部署 2*F+1 台服务器构成的zookeeper 集群。因此 3 台机器构成的 zookeeper 集群,能够在挂掉一台机器后依然正常工作。一个 5 台机器集群的服务,能够对 2 台机器down掉的情况下进行容灾。如果一台由 6 台服务构成的集群,同样只能挂掉 2 台机器。因此,5 台和 6 台在容灾能力上并没有明显优势,反而增加了网络通信负担。系统启动时,集群中的 server 会选举出一台server 为 Leader,其它的就作为 follower(这里先不考虑observer 角色)。

之所以要满足这样一个等式,是因为一个节点要成为集群中的 leader,需要有超过及群众过半数的节点支持,这个涉及到 leader 选举算法。同时也涉及到事务请求的提交投票。

所有事务请求必须由一个全局唯一的服务器来协调处理,这个服务器就是 Leader 服务器,其他的服务器就是follower。leader 服务器把客户端的失去请求转化成一个事务 Proposal(提议),并把这个 Proposal 分发给集群中的所有 Follower 服务器。之后 Leader 服务器需要等待所有Follower 服务器的反馈,一旦超过半数的 Follower 服务器进行了正确的反馈,那么 Leader 就会再次向所有的Follower 服务器发送 Commit 消息,要求各个 follower 节点对前面的一个 Proposal 进行提交;

3. ZAB协议

ZAB(Zookeeper Atomic Broadcast) 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广播协议。在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性,基于该协议,ZooKeeper 实现了一种主备模式的系统架构来保持集群中各个副本之间的数据一致性。ZAB 协议包含两种基本模式,分别是 1) 奔溃恢复 2)原子广播

当整个集群在启动时,或者当 leader 节点出现网络中断、崩溃等情况时,ZAB 协议就会进入恢复模式并选举产生新的 Leader,当 leader 服务器选举出来后,并且集群中有过半的机器和该 leader 节点完成数据同步后(同步指的是数据同步,用来保证集群中过半的机器能够和 leader 服务器的数据状态保持一致),ZAB 协议就会退出恢复模式。当集群中已经有过半的 Follower 节点完成了和 Leader 状态同步以后,那么整个集群就进入了消息广播模式。这个时候,在 Leader 节点正常工作时,启动一台新的服务器加入到集群,那这个服务器会直接进入数据恢复模式,和 leader 节点进行数据同步。同步完成后即可正常对外提供非事务请求的处理。

3.1 原子广播实现原理

消息广播的过程实际上是一个简化版本的二阶段提交过程。如下图:

leader 接收到消息请求后,将消息赋予一个全局唯一的64 位自增 id,叫:zxid,通过 zxid 的大小比较既可以实现因果有序这个特征

leader 为每个 follower 准备了一个 FIFO 队列(通过 TCP协议来实现,以实现了全局有序这一个特点)将带有 zxid的消息作为一个提案(proposal)分发给所有的 follower

当 follower 接收到 proposal,先把 proposal 写到磁盘,写入成功以后再向 leader 回复一个 ack

当 leader 接收到合法数量(超过半数节点)的 ACK 后,leader 就会向这些 follower 发送 commit 命令,同时会在本地执行该消息

当 follower 收到消息的 commit 命令以后,会提交该消息。

3.2 奔溃恢复实现原理

ZAB 协议的这个基于原子广播协议的消息广播过程,在正常情况下是没有任何问题的,但是一旦 Leader 节点崩溃,或者由于网络问题导致 Leader 服务器失去了过半的Follower 节点的联系(leader 失去与过半 follower 节点联系,可能leader 节点和 follower 节点之间产生了网络分区,那么此时的 leader 不再是合法的 leader 了),那么就会进入到崩溃恢复模式。在 ZAB 协议中,为了保证程序的正确运行,整个恢复过程结束后需要选举出一个新的Leader为了使 leader 挂了后系统能正常工作,需要解决以下两个问题.。   1. 已经被处理的消息不能丢失:当leader收到合法数量follower的ACKS之后,就会向各个follower广播COMMIT命令,同事也会在本地执行COMMIT并且向连接的客户端返回成功。但是如果各个follower在收到commit命令前leader挂了,导致剩下的服务器并没有执行到这条信息。leader对事务发起来commit操作,但是该消息在follower1上执行了,但是follower2没收到commit就挂了,而实际上客户端已经收到该事务处理成功的回执了。所以zab协议下需要保证所有的机器都要执行这个事务消息。 2.被丢弃的消息不能再次出现:当 leader 接收到消息请求生成 proposal 后就挂了,其他 follower 并没有收到此 proposal,因此经过恢复模式重新选了 leader 后,这条消息是被跳过的。 此时,之前挂了的 leader 重新启动并注册成了follower,他保留了被跳过消息的 proposal 状态,与整个系统的状态是不一致的,需要将其删除。

ZAB 协议需要满足上面两种情况,就必须要设计一个leader 选举算法:能够确保已经被 leader 提交的事务Proposal能够提交、同时丢弃已经被跳过的事务Proposal。针对这个要求: 如果 leader 选举算法能够保证新选举出来的 Leader 服务器拥有集群中所有机器最高编号(ZXID 最大)的事务Proposal,那么就可以保证这个新选举出来的 Leader 一定具有已经提交的提案。因为所有提案被 COMMIT 之前必须有超过半数的 follower ACK,即必须有超过半数节点的服务器的事务日志上有该提案的 proposal,因此,只要有合法数量的节点正常工作,就必然有一个节点保存了所有被 COMMIT 消息的 proposal 状态另外一个,zxid 是 64 位,高 32 位是 epoch 编号,每经过一次 Leader 选举产生一个新的 leader,新的 leader 会将epoch 号+1,低 32 位是消息计数器,每接收到一条消息这个值+1,新 leader 选举后这个值重置为 0.这样设计的好处在于老的 leader 挂了以后重启,它不会被选举为 leader,因此此时它的 zxid 肯定小于当前新的 leader。当老的leader 作为 follower 接入新的 leader 后,新的 leader 会让它将所有的拥有旧的 epoch 号的未被 COMMIT 的proposal 清除。

3.3 ZXID作用

zxid,也就是事务 id,为了保证事务的顺序一致性,zookeeper 采用了递增的事务 id 号(zxid)来标识事务。所有的提议(proposal)都在被提出的时候加上了 zxid。实现中 zxid 是一个 64 位的数字,它高 32 位是 epoch(ZAB 协议通过epoch 编号来区分 Leader 周期变化的策略)用来标识 leader 关系是否改变,每次一个 leader 被选出来,它都会有一个新的epoch=(原来的 epoch+1),标识当前属于那个 leader 的统治时期。低 32 位用于递增计数。epoch:可以理解为当前集群所处的年代或者周期,每个leader 就像皇帝,都有自己的年号,所以每次改朝换代,leader 变更之后,都会在前一个年代的基础上加 1。这样就算旧的 leader 崩溃恢复之后,也没有人听他的了,因为follower 只听从当前年代的 leader 的命令。 查看epoch

启动一个 zookeeper 集群。在 /“dataDir”/zookeeper/VERSION-2 路 径 下 会 看 到 一 个currentEpoch 文件。文件中显示的是当前的 epoch把 leader 节点停机,这个时候在看 currentEpoch 会有变化。 随着每次选举新的 leader,epoch 都会发生变化

4. leader 选举流程

Leader 选举会分两个过程启动的时候的 leader 选举、leader 崩溃的时候的的选举服务器启动时的 leader 选举。每个节点启动的时候状态都是 LOOKING,处于观望状态,接下来就开始进行选主流程进行 Leader 选举,至少需要两台机器,我们选取 3 台机器组成的服务器集群为例。 在集群初始化阶段,当有一台服务器 Server1 启动时,它本身是无法进行和完成 Leader 选举,当第二台服务器 Server2 启动时,这个时候两台机器可以相互通信,每台机器都试图找到 Leader,于是进入 Leader 选举过程。 选举过程如下:   (1) 每个 Server 发出一个投票。由于是初始情况,Server1和 Server2 都会将自己作为 Leader 服务器来进行投票,每次投票会包含所推举的服务器的 myid 和 ZXID、epoch,使用(myid, ZXID,epoch)来表示,此时 Server1的投票为(1, 0),Server2 的投票为(2, 0),然后各自将这个投票发给集群中其他机器。

(2) 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票(epoch)、是否来自LOOKING状态的服务器。

(3) 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行 PK,PK 规则如下

i. 优先检查 ZXID。ZXID 比较大的服务器优先作为Leader

ii. 如果 ZXID 相同,那么就比较 myid。myid 较大的服务器作为 Leader 服务器。

对于 Server1 而言,它的投票是(1, 0),接收 Server2的投票为(2, 0),首先会比较两者的 ZXID,均为 0,再比较 myid,此时 Server2 的 myid 最大,于是更新自己的投票为(2, 0),然后重新投票,对于 Server2 而言,它不需要更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

(4) 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于 Server1、Server2 而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了 Leader。

(5) 改变服务器状态。一旦确定了 Leader,每个服务器就会更新自己的状态,如果是 Follower,那么就变更为FOLLOWING,如果是 Leader,就变更为 LEADING。 运行过程中的 leader 选举:

当集群中的 leader 服务器出现宕机或者不可用的情况时,那么整个集群将无法对外提供服务,而是进入新一轮的Leader 选举,服务器运行期间的 Leader 选举和启动时期的 Leader 选举基本过程是一致的。

(1) 变更状态。Leader 挂后,余下的非 Observer 服务器都会将自己的服务器状态变更为 LOOKING,然后开始进入 Leader 选举过程。

(2) 每个 Server 会发出一个投票。在运行期间,每个服务器上的 ZXID 可能不同,此时假定 Server1 的 ZXID 为123,Server3的ZXID为122;在第一轮投票中,Server1和 Server3 都会投自己,产生投票(1, 123),(3, 122),然后各自将投票发送给集群中所有机器。接收来自各个服务器的投票。与启动时过程相同。

(3) 处理投票。与启动时过程相同,此时,Server1 将会成为 Leader。

(4) 统计投票。与启动时过程相同。   (5) 改变服务器的状态。与启动时过程相同

5. leader选举源码分析

有了理论基础以后,我们先读一下源码(zookeeper-3.4.12),看看他的实现逻辑。首先我们需要知道源码入口,也就是Zookeeper启动的主类: QuorumPeerMain 类的 main 方法开始:

5.1 入口main方法

public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); //初始化主要逻辑 try{ main.initializeAndRun(args); }catch (Exception e){ } System.exit(0); }

5.1 初始化逻辑initializeAndRun方法

做了两件事情: 1)启动后台定时任务异步执行清除任务,删除垃圾数据 2)判断是单机还是集群,如果是集群的话执行runFromConfig方法

protected void initializeAndRun(String[] args) throws QuorumPeerConfig.ConfigException, IOException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // 启动后台定时任务异步执行清除任务,删除垃圾数据 // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); //判断是集群还是单机 if (args.length == 1 && config.servers.size() > 0) { //集群 this.runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); //单机 ZooKeeperServerMain.main(args); } }

5.3 集群下的runFromConfig方法

作用: 1)初始化NIOServerCnxnFactory 2)进入一系列配置,包括配置myid及供客户端端口 3) 启动初始化 启动主线程

public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException var4) { LOG.warn("Unable to register log4j JMX control", var4); } LOG.info("Starting quorum peer"); try { // 初始化NIOServerCnxnFactory ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns()); // 逻辑主线程 进行投票,选举 this.quorumPeer = this.getQuorumPeer(); // 进入一系列的配置 this.quorumPeer.setQuorumPeers(config.getServers()); this.quorumPeer.setTxnFactory(new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()))); this.quorumPeer.setElectionType(config.getElectionAlg()); //配置 myid this.quorumPeer.setMyid(config.getServerId()); this.quorumPeer.setTickTime(config.getTickTime()); this.quorumPeer.setInitLimit(config.getInitLimit()); this.quorumPeer.setSyncLimit(config.getSyncLimit()); this.quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); this.quorumPeer.setCnxnFactory(cnxnFactory); this.quorumPeer.setQuorumVerifier(config.getQuorumVerifier()); // 为客户端提供写的server 即2181访问端口的访问功能 this.quorumPeer.setClientPortAddress(config.getClientPortAddress()); this.quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); this.quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); this.quorumPeer.setZKDatabase(new ZKDatabase(this.quorumPeer.getTxnFactory())); this.quorumPeer.setLearnerType(config.getPeerType()); this.quorumPeer.setSyncEnabled(config.getSyncEnabled()); this.quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl); if (this.quorumPeer.isQuorumSaslAuthEnabled()) { this.quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl); this.quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl); this.quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal); this.quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext); this.quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } this.quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); // 初始化的工作 this.quorumPeer.initialize(); // 启动主线程,QuorumPeer 重写了 Thread.start 方法 this.quorumPeer.start(); //使得线程之间的并行执行变为串行执行 this.quorumPeer.join(); } catch (InterruptedException var3) { LOG.warn("Quorum Peer interrupted", var3); } }

5.4 调用start方法:

作用: 1)载入本地数据 2)启动zk线程 3)启动选主

public synchronized void start() { //载入本地DB数据,主要还是epoch this.loadDataBase(); //启动ZooKeeperThread线程 this.cnxnFactory.start(); //启动leader选举线程 this.startLeaderElection(); super.start(); }

5.4.1 载入本地数据 loadDataBase方法

作用: 1)载入本地数据 2)根据zxid抽离epoch

private void loadDataBase() { File updating = new File(this.getTxnFactory().getSnapDir(), "updatingEpoch"); try { //载入本地数据 this.zkDb.loadDataBase(); // load the epochs 加载ZXID long lastProcessedZxid = this.zkDb.getDataTree().lastProcessedZxid; // 根据zxid的高32位是epoch号,低32位是事务id进行抽离epoch号 long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { //从${data}/version-2/currentEpochs文件中加载当前的epoch号 this.currentEpoch = this.readLongFromFile("currentEpoch"); //从 zxid中提取的epoch比文件里的epoch要大的话,并且没有正在修改epoch if (epochOfZxid > this.currentEpoch && updating.exists()) { LOG.info("{} found. The server was terminated after taking a snapshot but before updating current epoch. Setting current epoch to {}.", "updatingEpoch", epochOfZxid); //设置位大的epoch this.setCurrentEpoch(epochOfZxid); if (!updating.delete()) { throw new IOException("Failed to delete " + updating.toString()); } } } catch (FileNotFoundException var8) { this.currentEpoch = epochOfZxid; LOG.info("currentEpoch not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", this.currentEpoch); this.writeLongToFile("currentEpoch", this.currentEpoch); } //如果还比他大 抛出异常 if (epochOfZxid > this.currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(this.currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } else { try { //再比较 acceptedEpoch this.acceptedEpoch = this.readLongFromFile("acceptedEpoch"); } catch (FileNotFoundException var7) { this.acceptedEpoch = epochOfZxid; LOG.info("acceptedEpoch not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", this.acceptedEpoch); this.writeLongToFile("acceptedEpoch", this.acceptedEpoch); } if (this.acceptedEpoch < this.currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(this.acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(this.currentEpoch)); } } } catch (IOException var9) { LOG.error("Unable to load database on disk", var9); throw new RuntimeException("Unable to run quorum server ", var9); } }

5.4.2 初始化选举算法 leaderElection

作用: 1)根据myid zxid epoch选举参数创建voto对象准备选主

public synchronized void startLeaderElection() { try { // 根据myid zxid epoch 3个选举参数创建Voto 对象,准备选举 this.currentVote = new Vote(this.myid, this.getLastLoggedZxid(), this.getCurrentEpoch()); } catch (IOException var4) { RuntimeException re = new RuntimeException(var4.getMessage()); re.setStackTrace(var4.getStackTrace()); throw re; } Iterator var1 = this.getView().values().iterator(); while(var1.hasNext()) { QuorumPeer.QuorumServer p = (QuorumPeer.QuorumServer)var1.next(); if (p.id == this.myid) { this.myQuorumAddr = p.addr; break; } } if (this.myQuorumAddr == null) { throw new RuntimeException("My id " + this.myid + " not in the peer list"); } else { //如果是这个选举策略,代表 LeaderElection选举策略 if (this.electionType == 0) { try { //创建 UDP Socket this.udpSocket = new DatagramSocket(this.myQuorumAddr.getPort()); this.responder = new QuorumPeer.ResponderThread(); this.responder.start(); } catch (SocketException var3) { throw new RuntimeException(var3); } } //根据类型创建选举算法 this.electionAlg = this.createElectionAlgorithm(this.electionType); } }

5.4.3 选举算法的初始化 createElectionAlgorithm()方法

配置选举算法,选举算法有 3 种,可以通过在 zoo.cfg 里面进行配置,默认是 FastLeaderElection 选举

protected Election createElectionAlgorithm(int electionAlgorithm) { Election le = null; // 选择选举策略 //TODO: use a factory rather than a switch switch(electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: //Leader选举IO负责类 this.qcm = this.createCnxnManager(); QuorumCnxManager.Listener listener = this.qcm.listener; if (listener != null) { // 启动已绑定端口的选举线程,等待其他服务器连接 listener.start(); //基于 TCP的选举算法 le = new FastLeaderElection(this, this.qcm); } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; } return (Election)le; }

5.4.3.1 FastLeaderElection初始化

public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) { this.manager = manager; //构造中启动 this.starter(self, manager); } private void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; this.proposedLeader = -1L; this.proposedZxid = -1L; // 投票 发送队列 阻塞 this.sendqueue = new LinkedBlockingQueue(); // 投票 接收队列 阻塞 this.recvqueue = new LinkedBlockingQueue(); this.messenger = new FastLeaderElection.Messenger(manager); }

然后再 Messager 的构造函数里 初始化发送和接受两个线程并且启动线程。

Messenger(QuorumCnxManager manager) { //启动投票发送线程 this.ws = new FastLeaderElection.Messenger.WorkerSender(manager); Thread t = new Thread(this.ws, "WorkerSender[myid=" + FastLeaderElection.this.self.getId() + "]"); t.setDaemon(true); t.start(); //启动投票接收线程 this.wr = new FastLeaderElection.Messenger.WorkerReceiver(manager); t = new Thread(this.wr, "WorkerReceiver[myid=" + FastLeaderElection.this.self.getId() + "]"); t.setDaemon(true); t.start(); }

5.4.4 super.start方法

public void run() { this.setName("QuorumPeer[myid=" + this.getId() + "]" + this.cnxnFactory.getLocalAddress()); LOG.debug("Starting quorum peer"); //通过JMX初始化。来监控一些属性的代码 try { this.jmxQuorumBean = new QuorumBean(this); MBeanRegistry.getInstance().register(this.jmxQuorumBean, (ZKMBeanInfo)null); Iterator var1 = this.getView().values().iterator(); while(var1.hasNext()) { QuorumPeer.QuorumServer s = (QuorumPeer.QuorumServer)var1.next(); if (this.getId() == s.id) { LocalPeerBean p = this.jmxLocalPeerBean = new LocalPeerBean(this); try { MBeanRegistry.getInstance().register(p, this.jmxQuorumBean); } catch (Exception var92) { LOG.warn("Failed to register with JMX", var92); this.jmxLocalPeerBean = null; } } else { RemotePeerBean p = new RemotePeerBean(s); try { MBeanRegistry.getInstance().register(p, this.jmxQuorumBean); } catch (Exception var91) { LOG.warn("Failed to register with JMX", var91); } } } } catch (Exception var95) { LOG.warn("Failed to register with JMX", var95); this.jmxQuorumBean = null; } try { while(this.running) { switch(this.getPeerState()) { case LOOKING: //LOOKING 状态,则进入选举 LOG.info("LOOKING"); if (Boolean.getBoolean("readonlymode.enabled")) { LOG.info("Attempting to start ReadOnlyZooKeeperServer"); // 创建 ReadOnlyZooKeeperServer,但是不立即启动 final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(this.logFactory, this, new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb); //通过 Thread 异步解耦 Thread roZkMgr = new Thread() { public void run() { try { sleep((long)Math.max(2000, QuorumPeer.this.tickTime)); if (QuorumPeer.ServerState.LOOKING.equals(QuorumPeer.this.getPeerState())) { roZk.startup(); } } catch (InterruptedException var2) { QuorumPeer.LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started"); } catch (Exception var3) { QuorumPeer.LOG.error("FAILED to start ReadOnlyZooKeeperServer", var3); } } }; try { //启动 roZkMgr.start(); this.setBCVote((Vote)null); // 通过策略模式来决定当前用哪个算法选举 this.setCurrentVote(this.makeLEStrategy().lookForLeader()); } catch (Exception var89) { LOG.warn("Unexpected exception", var89); this.setPeerState(QuorumPeer.ServerState.LOOKING); } finally { roZkMgr.interrupt(); roZk.shutdown(); } } else { try { this.setBCVote((Vote)null); this.setCurrentVote(this.makeLEStrategy().lookForLeader()); } catch (Exception var88) { LOG.warn("Unexpected exception", var88); this.setPeerState(QuorumPeer.ServerState.LOOKING); } } break; case LEADING: // leader 节点 LOG.info("LEADING"); try { try { this.setLeader(this.makeLeader(this.logFactory)); this.leader.lead(); this.setLeader((Leader)null); } catch (Exception var83) { LOG.warn("Unexpected exception", var83); } break; } finally { if (this.leader != null) { this.leader.shutdown("Forcing shutdown"); this.setLeader((Leader)null); } this.setPeerState(QuorumPeer.ServerState.LOOKING); } case FOLLOWING: // 从节点状态 try { try { LOG.info("FOLLOWING"); this.setFollower(this.makeFollower(this.logFactory)); this.follower.followLeader(); } catch (Exception var84) { LOG.warn("Unexpected exception", var84); } break; } finally { this.follower.shutdown(); this.setFollower((Follower)null); this.setPeerState(QuorumPeer.ServerState.LOOKING); } case OBSERVING: // Observing 针对 Observer角色的节点 try { LOG.info("OBSERVING"); this.setObserver(this.makeObserver(this.logFactory)); this.observer.observeLeader(); } catch (Exception var86) { LOG.warn("Unexpected exception", var86); } finally { this.observer.shutdown(); this.setObserver((Observer)null); this.setPeerState(QuorumPeer.ServerState.LOOKING); } } } } finally { LOG.warn("QuorumPeer main thread exited"); try { MBeanRegistry.getInstance().unregisterAll(); } catch (Exception var82) { LOG.warn("Failed to unregister with JMX", var82); } this.jmxQuorumBean = null; this.jmxLocalPeerBean = null; } }

5.4.4.1 lookForLeader方法

由于是刚刚启动,是 LOOKING 状态。所以走第一条分支。调用 setCurrentVote(makeLEStrategy().lookForLeader());,最终根据上一步选择的策略应该运行 FastLeaderElection 中的选举算法,看一下 lookForLeader()

public Vote lookForLeader() throws InterruptedException { try { this.self.jmxLeaderElectionBean = new LeaderElectionBean(); MBeanRegistry.getInstance().register(this.self.jmxLeaderElectionBean, this.self.jmxLocalPeerBean); } catch (Exception var23) { LOG.warn("Failed to register with JMX", var23); this.self.jmxLeaderElectionBean = null; } if (this.self.start_fle == 0L) { this.self.start_fle = Time.currentElapsedTime(); } try { // 收到的投票 HashMap<Long, Vote> recvset = new HashMap(); // 存储选举结果 HashMap<Long, Vote> outofelection = new HashMap(); int notTimeout = 200; synchronized(this) { // 增加逻辑时钟 this.logicalclock.incrementAndGet(); // 修改自己的zxid epoch this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch()); } LOG.info("New election. My id = " + this.self.getId() + ", proposed zxid=0x" + Long.toHexString(this.proposedZxid)); // 发送投票 this.sendNotifications(); FastLeaderElection.Notification n; // 主循环 直到选举出leader while(this.self.getPeerState() == QuorumPeer.ServerState.LOOKING && !this.stop) { //从IO进程里面 获取投票结果,自己的投票也在里面 n = (FastLeaderElection.Notification)this.recvqueue.poll((long)notTimeout, TimeUnit.MILLISECONDS); // 如果没有获取到足够的通知就一直发送自己的选票,也就是持续进行选举 if (n == null) { // 如果空了 就继续发送 直到选举出leader if (this.manager.haveDelivered()) { this.sendNotifications(); } else { // 消息没发出去,可能其他集群没启动 继续尝试连接 this.manager.connectAll(); } // 延长超时时间 int tmpTimeOut = notTimeout * 2; notTimeout = tmpTimeOut < 60000 ? tmpTimeOut : '\uea60'; LOG.info("Notification time out: " + notTimeout); } // 收到投票消息 查看是否属于本集群内的消息 else if (!this.self.getVotingView().containsKey(n.sid)) { LOG.warn("Ignoring notification from non-cluster member " + n.sid); } else { Vote endVote; Vote var6; // 判断收到消息的节点状态 switch(n.state) { case LOOKING: // 判断epoch 是否大于 logicalclock ,如是,则是新一轮选举 if (n.electionEpoch > this.logicalclock.get()) { // 更新本地logicalclock this.logicalclock.set(n.electionEpoch); // 清空接受队列 recvset.clear(); // 一次性比较 myid epoch zxid 看此消息是否胜出 if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch())) { //投票结束修改票据为 leader票据 this.updateProposal(n.leader, n.zxid, n.peerEpoch); } else { //否则票据不变 this.updateProposal(this.getInitId(), this.getInitLastLoggedZxid(), this.getPeerEpoch()); } // 继续广播票据,让其他节点知道我现在的投票 this.sendNotifications(); } else { //如果是epoch小于当前 忽略 if (n.electionEpoch < this.logicalclock.get()) { if (LOG.isDebugEnabled()) { LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(this.logicalclock.get())); } break; } //如果 epoch 相同 跟上面一样的比较 更新票据 广播票据 if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) { this.updateProposal(n.leader, n.zxid, n.peerEpoch); this.sendNotifications(); } } if (LOG.isDebugEnabled()) { LOG.debug("Adding vote: from=" + n.sid + ", proposed leader=" + n.leader + ", proposed zxid=0x" + Long.toHexString(n.zxid) + ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch)); } // 把最终票据放进接受队列 用来做最后判断 recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 判断选举是否结束 默认算法是否超过半数同意 见下面代码 if (!this.termPredicate(recvset, new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch))) { break; } // 一直等待 notification 到达 直到超时就返回null while((n = (FastLeaderElection.Notification)this.recvqueue.poll(200L, TimeUnit.MILLISECONDS)) != null) { if (this.totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, this.proposedLeader, this.proposedZxid, this.proposedEpoch)) { this.recvqueue.put(n); break; } } // 确定 leader if (n == null) { // 修改状态 this.self.setPeerState(this.proposedLeader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState()); //返回最终投票结果 endVote = new Vote(this.proposedLeader, this.proposedZxid, this.logicalclock.get(), this.proposedEpoch); this.leaveInstance(endVote); var6 = endVote; return var6; } break; // 如果收到的选票状态 不是LOOKING 比如刚刚加入已经选举好的集群 // Observer 不参与选举 case OBSERVING: LOG.debug("Notification from observer: " + n.sid); break; case FOLLOWING: case LEADING: // 判断 epoch 是否相同 if (n.electionEpoch == this.logicalclock.get()) { recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); // 投票是否结束 结束的话确认leader 是否有效 // 如果结束 修改自己的投票并且返回 if (this.ooePredicate(recvset, outofelection, n)) { this.self.setPeerState(n.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState()); endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); this.leaveInstance(endVote); var6 = endVote; return var6; } } //在加入一个已建立的集群之前,确认大多数人都在跟随同一个Leader。 outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state)); if (this.ooePredicate(outofelection, outofelection, n)) { synchronized(this) { this.logicalclock.set(n.electionEpoch); this.self.setPeerState(n.leader == this.self.getId() ? QuorumPeer.ServerState.LEADING : this.learningState()); } endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch); this.leaveInstance(endVote); var6 = endVote; return var6; } break; default: LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)", n.state, n.sid); } } } n = null; return n; } finally { try { if (this.self.jmxLeaderElectionBean != null) { MBeanRegistry.getInstance().unregister(this.self.jmxLeaderElectionBean); } } catch (Exception var20) { LOG.warn("Failed to unregister with JMX", var20); } this.self.jmxLeaderElectionBean = null; LOG.debug("Number of connection processing threads: {}", this.manager.getConnectionThreadCount()); } }

以上代码就是整个选举的核心。

首先更新logicalclock并通过 updateProposal 修改自己的选票信息,并且通过 sendNotifications 进行发送选票。进入主循环进行本轮投票。从recvqueue队列中获取一个投票信息,如果没有获取到足够的选票通知一直发送自己的选票,也就是持续进行选举,否则进入步骤4。判断投票信息中的选举状态: 4.1 LOOKING状态: 4.1.1 如果对方的Epoch大于本地的logicalclock,则更新本地的logicalclock并清空本地投票信息统计箱recvset,并将自己作为候选和投票中的leader进行比较,选择大的作为新的投票,然后广播出去,否则进入下面步骤4.1.2。 4.1.2 如果对方的Epoch小于本地的logicalclock,则忽略对方的投票,重新进入下一轮选举流程,否则进入下面步骤4.1.3。 4.1.3 如果对方的Epoch等于本地的logicalclock,则比较当前本地被推选的leader和投票中的leader,选择大的作为新的投票,然后广播出去。 4.1.4 把对方的投票信息保存到本地投票统计箱recvset中,判断当前被选举的leader是否在投票中占了大多数(大于一半的server数量),如果是则需再等待finalizeWait时间(从recvqueue继续poll投票消息)看是否有人修改了leader的候选,如果有则再将该投票信息再放回recvqueue中并重新开始下一轮循环,否则确定角色,结束选举。 4.2 OBSERVING状态:不参与选举。 4.3 FOLLOWING/LEADING: 4.3.1 如果对方的Epoch等于本地的logicalclock,把对方的投票信息保存到本地投票统计箱recvset中,判断对方的投票信息是否在recvset中占大多数并且确认自己确实为leader,如果是则确定角色,结束选举,否则进入下面步骤4.3.2。 4.3.2 将对方的投票信息放入本地统计不参与投票信息箱outofelection中,判断对方的投票信息是否在outofelection中占大多数并且确认自己确实为leader,如果是则更新logicalclock为当前epoch,并确定角色,结束选举,否则进入下一轮选举。
5.4.4.1.1 选票比较方法 totalOrderPredicate方法
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if (this.self.getQuorumVerifier().getWeight(newId) == 0L) { return false; } else { /*如果以下三种情况之一成立,则返回true: * 1-选票中epoch更高 * 2-选票中epoch与当前epoch相同,但新zxid更高 * 3-选票中epoch与当前epoch相同,新zxid与当前zxid相同服务器id更高。 */ return newEpoch > curEpoch || newEpoch == curEpoch && (newZxid > curZxid || newZxid == curZxid && newId > curId); } }
5.4.4.1.2 选票方法 termPredicate方法
protected boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) { HashSet<Long> set = new HashSet(); Iterator var4 = votes.entrySet().iterator(); // 遍历接收到的集合 把符合当前投票的 放入 Set while(var4.hasNext()) { Map.Entry<Long, Vote> entry = (Map.Entry)var4.next(); if (vote.equals(entry.getValue())) { set.add(entry.getKey()); } } // 统计票据,看是否过半 return this.self.getQuorumVerifier().containsQuorum(set); }
5.4.4.1.3 广播sendNotifications方法
private void sendNotifications() { Iterator var1 = this.self.getView().values().iterator(); while(var1.hasNext()) {// 循环发送 QuorumPeer.QuorumServer server = (QuorumPeer.QuorumServer)var1.next(); // 准备发送的消息实体 AuthFastLeaderElection.ToSend notmsg = new AuthFastLeaderElection.ToSend(AuthFastLeaderElection.ToSend.mType.notification, (long)(sequencer++), this.proposedLeader, this.proposedZxid, this.logicalclock, QuorumPeer.ServerState.LOOKING, ((QuorumPeer.QuorumServer)this.self.getView().get(server.id)).electionAddr); // 使用offer 添加到队列 会被sendWorker线程消费 this.sendqueue.offer(notmsg); } }

5.4.4.2 选举过程

其实在这个投票过程中就涉及到几个类,FastLeaderElection:FastLeaderElection 实现了 Election 接口,实现各服务器之间基于 TCP 协议进行选举Notification:内部类,Notification 表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了被选举者的 id、zxid、选举周期等信息ToSend:ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的 id、zxid、选举周期等信息Messenger : Messenger 包 含 了 WorkerReceiver 和WorkerSender 两个内部类;WorkerReceiver 实现了 Runnable 接口,是选票接收器。其会不断地从 QuorumCnxManager 中获取其他服务器发来的选举消息,并将其转换成一个选票,然后保存到recvqueue 中WorkerSender 也实现了 Runnable 接口,为选票发送器,其会不断地从 sendqueue 中获取待发送的选票,并将其传递到底层 QuorumCnxManager 中

至于 Zookeeper是如何接受请求的,其实我们可以看一下代码   其中 cnxnFactory.start(); 就是启动了服务端的接受请求的线程,默认实现有两个 NIO 及 Netty:    默认是nio

public static ServerCnxnFactory createFactory() throws IOException { String serverCnxnFactoryName = System.getProperty("zookeeper.serverCnxnFactory"); if (serverCnxnFactoryName == null) { serverCnxnFactoryName = NIOServerCnxnFactory.class.getName(); } try { ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory)Class.forName(serverCnxnFactoryName).getDeclaredConstructor().newInstance(); LOG.info("Using {} as server connection factory", serverCnxnFactoryName); return serverCnxnFactory; } catch (Exception var3) { IOException ioe = new IOException("Couldn't instantiate " + serverCnxnFactoryName); ioe.initCause(var3); throw ioe; } }

5.4.5 流程图

最新回复(0)