会不断更新!冲冲冲!跳转连接
https://blog.csdn.net/qq_35349982/category_10317485.html
zookeeper
1.介绍
Zookeeper 分布式数据一致性的解决方案,分布式应用程序可以基于他实现诸如数据订阅/发布,负载均衡,命名服务,集群管理,分布式锁,分布式队列
2.安装篇
2.1.安装单机版
1.下载
cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
2.解压
tar -zxvf zookeeper-3.4.13.tar.gz
cd zookeeper-3.4.13
3.修改配置文件
cd zookeeper-3.4.13/conf
cp zoo_sample.cfg zoo.cfg
cd /usr/local/src/zookeeper/zookeeper-3.4.9
mkdir data
mkdir log
vi zoo.cfg
dataDir
=/usr/local/src/zookeeper/data
dataLogDir
=/usr/local/src/zookeeper/log
4.配置环境变量
export ZOOKEEPER_INSTALL
=/usr/local/src/zookeeper
export PATH
=$PATH:$ZOOKEEPER_INSTALL/bin
5.启动
cd /usr/local/src/zookeeper/bin
./zkServer.sh start
2.2 安装集群版
1.改名称
mv zookeeper-3.4.14 zookeeper01
2.复制多分
cp -r zookeeper01/ zookeeper02
cp -r zookeeper01/ zookeeper03
3.修改配置文件
mkdir data
mkdir logs
cd conf
mv zoo_sample.cfg zoo.cfg
4.编写配置文件 (修改三个配置文件)
clientPort
=2181
dataDir
=/usr/local/src/zookeeper2/data
dataLogDir
=/usr/local/src/zookeeper2/logs
5.修改集群配置文件
先查看IP ( 不可以使用服务器IP)
ifconfig
在每个zookeeper文件的data目录下创建一个myid文件,内容分别为1.2.3,这个文件就是记录每个服务器的ID
touch myid
在每个zookeeper中的zoo.cfg文件中,配置集群服务器Ip
server.1
=172.17.153.160:2881:3881
server.2
=172.17.153.160:2882:3882
server.3
=172.17.153.160:2883:3883
jps
6.启动
./zkServer.sh start
./zkServer.sh status
./zkServer.sh stop
安装的问题总结
1.停止8080端口
netstat -nltp
| grep 2181
kill -9 3027
2.注意开放的端口,以及防火墙的问题
3.查询日志
./zkServer.sh start-foreground
4.在log中有一个 out文件,看下里面的报错信息
5.端口被占用后,把dara和logs中的旧文件全部删掉
https://blog.csdn.net/Hello_World_QWP/article/details/90765608?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.channel_param
https://blog.csdn.net/qq_36651243/article/details/89396618
https://www.jianshu.com/p/0335f1f41420
https://www.cnblogs.com/zimo-jing/p/9037853.html
https://blog.csdn.net/weixin_45793065/article/details/106709479
3.基本命令
3.1 命令行
1.1.创建节点
./zkCli.sh
ls /
create -s /zk-test 123
create -e /zk-temp 123
create /zk-permanent 123
1.2.读取节点
get /zk-permanent
1.3.更新 删除节点
get /zk-permanent
set /zk-permanent 456
delete /zk-permanent
3.2 ZooKeeper类
<dependency>
<groupId>org.apache.zookeeper
</groupId>
<artifactId>zookeeper
</artifactId>
<version>3.4.14
</version>
</dependency>
<dependency>
<groupId>com.101tec
</groupId>
<artifactId>zkclient
</artifactId>
<version>0.2
</version>
</dependency>
2.1连接并创建
public class CreateNote implements Watcher {
private static CountDownLatch countDownLatch
= new CountDownLatch(1);
private static ZooKeeper zooKeeper
;
public static void main(String
[] args
) throws IOException
, InterruptedException
, KeeperException
{
zooKeeper
= new ZooKeeper("47.95.1.96:2181", 5000, new CreateNote());
System
.out
.println(zooKeeper
.getState());
Thread
.sleep(Integer
.MAX_VALUE
);
}
public void process(WatchedEvent watchedEvent
) {
if(watchedEvent
.getState() == Event
.KeeperState
.SyncConnected
){
System
.out
.println("process方法执行了...");
try {
createNoteSync();
} catch (KeeperException e
) {
e
.printStackTrace();
} catch (InterruptedException e
) {
e
.printStackTrace();
}
}
}
private static void createNoteSync() throws KeeperException
, InterruptedException
{
String note_persistent
= zooKeeper
.create("/my-persistent", "持久节点内容".getBytes(), ZooDefs
.Ids
.OPEN_ACL_UNSAFE
, CreateMode
.PERSISTENT
);
System
.out
.println("节点创建完成");
}
}
2.2查询,更新
private void updateNoteSync() throws KeeperException
, InterruptedException
{
Stat stat
= zooKeeper
.setData("/my-persistent", "我新建了一个节点".getBytes(), -1);
byte[] data
= zooKeeper
.getData("/my-persistent", false, null
);
System
.out
.println("修改前的值:" + new String(data
));
Stat stat1
= zooKeeper
.setData("/my-persistent", "我修改了我新建的节点".getBytes(), -1);
byte[] data2
= zooKeeper
.getData("/my-persistent", false, null
);
System
.out
.println("修改后的值:" + new String(data2
));
}
2.3删除
private void deleteNoteSync() throws KeeperException
, InterruptedException
{
Stat stat
= zooKeeper
.exists("/lg-persistent/c1", false);
System
.out
.println(stat
== null
? "该节点不存在":"该节点存在");
if(stat
!= null
){
zooKeeper
.delete("/lg-persistent/c1",-1);
}
Stat stat2
= zooKeeper
.exists("/lg-persistent/c1", false);
System
.out
.println(stat2
== null
? "该节点不存在":"该节点存在");
}
3.3 curator
<dependency>
<groupId>org.apache.curator
</groupId>
<artifactId>curator-framework
</artifactId>
<version>2.12.0
</version>
</dependency>
3.1连接
RetryPolicy exponentialBackoffRetry
= new ExponentialBackoffRetry(1000, 3);
CuratorFramework client
= CuratorFrameworkFactory
.builder()
.connectString("47.95.1.96:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(exponentialBackoffRetry
)
.namespace("base")
.build();
client
.start();
System
.out
.println("会话2创建了");
String path
= "/lg-curator/c1";
String s
= client
.create().creatingParentsIfNeeded()
.withMode(CreateMode
.PERSISTENT
).forPath(path
, "init".getBytes());
System
.out
.println("节点递归创建成功,该节点路径" + s
);
3.2新增,获取状态信息
String path
= "/lg-curator/c1";
String s
= client
.create().creatingParentsIfNeeded()
.withMode(CreateMode
.PERSISTENT
).forPath(path
, "init".getBytes());
System
.out
.println("节点递归创建成功,该节点路径" + s
);
byte[] bytes
= client
.getData().forPath(path
);
System
.out
.println("获取到的节点数据内容:" + new String(bytes
));
Stat stat
= new Stat();
client
.getData().storingStatIn(stat
).forPath(path
);
System
.out
.println("获取到的节点状态信息:" + stat
);
3.3更新
int version
= client
.setData().withVersion(stat
.getVersion()).forPath(path
, "修改内容1".getBytes()).getVersion();
System
.out
.println("当前的最新版本是" + version
);
byte[] bytes2
= client
.getData().forPath(path
);
System
.out
.println("修改后的节点数据内容:" + new String(bytes2
));
client
.setData().withVersion(stat
.getVersion()).forPath(path
,"修改内容2".getBytes());
3.4删除
String path
= "/lg-curator";
client
.delete().deletingChildrenIfNeeded().withVersion(-1).forPath(path
);
System
.out
.println("删除成功,删除的节点" + path
);
3.5 三种监听
NodeCache: 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作。
PathChildrenCache: 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听
TreeCache: 可以将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647(int类型的最大值)
1.PathChildrenCache实现
PathChildrenCache pathChildrenCache
= new PathChildrenCache(curatorFramework
, "/serviceList", false);
pathChildrenCache
.start(PathChildrenCache
.StartMode
.POST_INITIALIZED_EVENT
);
pathChildrenCache
.getListenable().addListener(new CilentListener());
public class CilentListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework curatorFramework
, PathChildrenCacheEvent pathChildrenCacheEvent
) throws Exception
{
PathChildrenCacheEvent
.Type type
= pathChildrenCacheEvent
.getType();
if (type
.equals(PathChildrenCacheEvent
.Type
.CHILD_REMOVED
)) {
System
.out
.println(pathChildrenCacheEvent
.getData().getPath());
}
if (type
.equals(PathChildrenCacheEvent
.Type
.CHILD_ADDED
)) {
System
.out
.println("新增"+pathChildrenCacheEvent
.getData().getPath());
String path
= pathChildrenCacheEvent
.getData().getPath();
String
[] split
= path
.split("/");
String serviceValue
= CuratorUtils
.findServiceRegister(pathChildrenCacheEvent
.getData().getPath());
Map
<String, String> serverAddressMap
= ConsumerBoot
.serverAddressMap
;
serverAddressMap
.put(split
[split
.length
-1],serviceValue
);
}
if (type
.equals(PathChildrenCacheEvent
.Type
.CHILD_REMOVED
)) {
String path
= pathChildrenCacheEvent
.getData().getPath();
String
[] split
= path
.split("/");
System
.out
.println("移除"+pathChildrenCacheEvent
.getData().getPath());
Map
<String, String> serverAddressMap
= ConsumerBoot
.serverAddressMap
;
serverAddressMap
.remove(split
[split
.length
-1]);
System
.out
.println("map中的数量"+serverAddressMap
.size());
}
}
}
工具类
package com
.lagou
.zookeeper
;
import com
.lagou
.ZKConstant
;
import org
.apache
.curator
.framework
.CuratorFramework
;
import org
.apache
.curator
.framework
.CuratorFrameworkFactory
;
import org
.apache
.curator
.framework
.recipes
.cache
.PathChildrenCache
;
import org
.apache
.curator
.framework
.recipes
.cache
.PathChildrenCacheEvent
;
import org
.apache
.curator
.framework
.recipes
.cache
.PathChildrenCacheListener
;
import org
.apache
.curator
.retry
.ExponentialBackoffRetry
;
import org
.apache
.zookeeper
.CreateMode
;
import org
.apache
.zookeeper
.data
.Stat
;
import java
.nio
.ByteBuffer
;
import java
.util
.*
;
public class CuratorUtils {
private static String serverList
= "serviceList";
private static int serverNum
= 0;
public static CuratorFramework
build(){
CuratorFramework curatorFramework
= CuratorFrameworkFactory
.builder()
.connectString("127.0.0.1:2181")
.namespace(ZKConstant
.ZK_NAMESPACE
)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework
.start();
return curatorFramework
;
}
public static CuratorFramework
serviceRegister(String address
,int port
) throws Exception
{
CuratorFramework client
= CuratorUtils
.build();
String serviceAddress
= address
+":"+port
;
Stat s
= client
.checkExists().forPath("/"+serverList
);
if (s
== null
) {
client
.create().withMode(CreateMode
.PERSISTENT
).forPath("/"+serverList
);
}
client
.create().creatingParentContainersIfNeeded()
.withMode(CreateMode
.EPHEMERAL_SEQUENTIAL
)
.forPath("/"+serverList
+"/"+String
.valueOf(serverNum
+1),serviceAddress
.getBytes());
return client
;
}
public static List
<String> findServiceRegList() throws Exception
{
CuratorFramework client
= CuratorUtils
.build();
List
<String> nodeList
= client
.getChildren().forPath("/" + serverList
);
return nodeList
;
}
public static void deleteServiceRegister(String path
) throws Exception
{
CuratorFramework client
= CuratorUtils
.build();
client
.delete().forPath(path
);
}
public static String
findServiceRegister(String path
) throws Exception
{
CuratorFramework client
= CuratorUtils
.build();
byte[] bytes
= client
.getData().forPath(path
);
return new String(bytes
);
}
public static void addServiceTime(String path
,String dateStr
) throws Exception
{
CuratorFramework client
= CuratorUtils
.build();
if(client
.checkExists().forPath(path
)==null
){
client
.create().creatingParentContainersIfNeeded().withMode(CreateMode
.EPHEMERAL
).forPath(path
,dateStr
.getBytes());
}else{
client
.setData().forPath(path
,dateStr
.getBytes());
}
}
public static void addReponseTime(String path
,String dateStr
) throws Exception
{
CuratorFramework client
= CuratorUtils
.build();
if(client
.checkExists().forPath(path
)==null
){
client
.create().creatingParentContainersIfNeeded().withMode(CreateMode
.EPHEMERAL_SEQUENTIAL
).forPath(path
,dateStr
.getBytes());
}else{
client
.setData().forPath(path
,dateStr
.getBytes());
}
}
public static byte[] longToBytes(long x
) {
ByteBuffer buffer
= ByteBuffer
.allocate(Long
.BYTES
);
buffer
.putLong(x
);
return buffer
.array();
}
public static Object
getMinValue(Map
<String, Integer> map
) {
if (map
== null
) return null
;
Collection
<Integer> c
= map
.values();
Object
[] obj
= c
.toArray();
Arrays
.sort(obj
);
return obj
[0];
}
public static void main(String
[] args
) throws Exception
{
int numbers
[] ={1,2,5};
Random random
= new Random();
int i
= random
.nextInt(numbers
.length
);
System
.out
.println(i
);
System
.out
.println(numbers
[i
]);
}
}
-Dlog4j.configuration=file:C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\log4j.properties
4.应用场景
1.数据的发布与订阅
发布与订阅即所谓的配置中心
推(Push)
拉(Pull)
具备三个特征
数据量比较小数据会动态发生变化集群中各机器共享,配置一致
配置获取
配置变更
2.命名服务
Zookeeper节点创建的API接口创建顺序节点,并且返回值中会返回这个节点的完整名字,来生成全局唯一的ID
集群管理
Master选举
分布式锁
分布式队列
FIFO先入先出分布式屏障
3.集群管理
利用临时节点,判断服务器的断开与连接,监听。来管理集群
分布式日志收集系统
4.Master选举
Master一般协调集群中的其他系统单元,具有对分布式系统状态变更的绝对权。
读写分离的产经,客户端的写请求往往是由Master来处理Master负责处理一些复杂的逻辑,同步给其他单元
5.分布式锁
排他锁
事务T1对数据对象O1加上了排他锁,枷锁期间,只允许事务T1对O1进行读取和更新操作,加锁期间,其他事务不能对这个数据对象进行任何类型的操作
共享锁(读锁)
事务T1对数据对象O1加上了共享锁,则当前事务只能对O1进行读取事务,其他事务也只能对这个数据对象加共享锁
羊群效应
6.分布式队列
特征
ZAB协议需要确保那些已经在Leader服务器上提交的事务最终被所以服务器都提交ZAB协议需要确保丢弃那些只在Leader服务器上被提出的事务
FIFO先入先出
First Input First Output 先入先出
分布式屏障
等待队列元素聚集后同意安排处理执行的Barrier模型
5.Zookeeper原理
5.1 ZAB协议
Zookeeper Atomic Broadcast(ZAB,Zookeeper源自消息广播协议)
支持崩溃恢复的原子广播协议
cai用ZAB协议来实现分布式数据的一致性,主备模式的系统架构来保持个副本之间的数据一致性,表现形式就是 采用单一的主进程来接受并处理客户端的所有事务请求,然后采用事务Proposal的形式广播到所有的副本进程中
5.2两种模式
崩溃恢复与消息广播
5.3三种状态*
LOOKING :Leader选举阶段FOLLOWING:Follower服务器和Leader服务器LEADING : Leader服务器作为主进程领导状态
所有进程初始化状态都是LOOKING状态
5.4ZAB与Paxos的联系与区别
都存在一个类似于Leader进程的角色,负责协调多个Follower进程的运行Leader进程都会等待超过半数的Follower做出正确反馈后,才会提议进行提交在ZAB协议中,每个Proposal都包含一个epoch值,用来代表当前的Leader周期,在Paxos中同样存在这样一个标识
ZAB协议主要用于高可用的分布式数据主备系统
Paxos算法主要构件一个分布式的一致性状态机系统
5.4服务器的角色*
Leader
事务请求的唯一调度和处理者,保证集群事务集群内部各服务器的调度者
Follwer
处理客户端非事务性请求(读取数据),转发事务给Leader参与事务请求Proposal的投票参数Leader选举投票
Observer
提供非事务服务
不参与投票
6.源码刨析
1.搭建项目
在VM options中添加(log4j的文件目录)
-Dlog4j.configuration=file:C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\log4j.properties
在program arguments**( 配置zoo.cfg的目录)**
C:\Users\gaoyuan\Desktop\Zookeeper\Zookeeper\zookeeper_code\zookeeper_code\zookeeper-release-3.5.4\conf\zoo.cfg
https://www.cnblogs.com/heyonggang/p/12123991.html
2.server的创建流程
启动类 ZooKeeperServerMain
启动类
main.initializeAndRun(args)
注册jmx
ManagedUtil.registerLog4jMBeans();
解析配置文件
config.parse(args[0]);
初始化日志
FileTxnSnapLog txnLog = null;
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
初始化zkServer对象
final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
创建ServerCnxnFactory对象=====ServerCnxnFactory是Zookeeper中的重要组件,负责处理客户端与服务器的连接
cnxnFactory
= ServerCnxnFactory
.createFactory();
初始化配置信息
cnxnFactory
.configure(config
.getClientPortAddress(), config
.getMaxClientCnxns(), false);
@Override
public void configure(InetSocketAddress addr
, int maxcc
, boolean secure
) throws IOException
{
if (secure
) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();
maxClientCnxns
= maxcc
;
sessionlessCnxnTimeout
= Integer
.getInteger(
ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT
, 10000);
cnxnExpiryQueue
=
new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout
);
expirerThread
= new ConnectionExpirerThread();
int numCores
= Runtime
.getRuntime().availableProcessors();
numSelectorThreads
= Integer
.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS
,
Math
.max((int) Math
.sqrt((float) numCores
/2), 1));
if (numSelectorThreads
< 1) {
throw new IOException("numSelectorThreads must be at least 1");
}
numWorkerThreads
= Integer
.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS
, 2 * numCores
);
workerShutdownTimeoutMS
= Long
.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT
, 5000);
LOG
.info("Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout
/1000) + "s sessionless connection"
+ " timeout, " + numSelectorThreads
+ " selector thread(s), "
+ (numWorkerThreads
> 0 ? numWorkerThreads
: "no")
+ " worker threads, and "
+ (directBufferBytes
== 0 ? "gathered writes." :
("" + (directBufferBytes
/1024) + " kB direct buffers.")));
for(int i
=0; i
<numSelectorThreads
; ++i
) {
selectorThreads
.add(new SelectorThread(i
));
}
this.ss
= ServerSocketChannel
.open();
ss
.socket().setReuseAddress(true);
LOG
.info("binding to port " + addr
);
ss
.socket().bind(addr
);
ss
.configureBlocking(false);
acceptThread
= new AcceptThread(ss
, addr
, selectorThreads
);
}
启动服务
cnxnFactory
.startup(zkServer
);
package org
.apache
.zookeeper
.server
;
ZooKeeperServerMain main
= new ZooKeeperServerMain();
protected void initializeAndRun(String
[] args
)
throws ConfigException
, IOException
, AdminServerException
{
try {
ManagedUtil
.registerLog4jMBeans();
} catch (JMException e
) {
LOG
.warn("Unable to register log4j JMX control", e
);
}
ServerConfig config
= new ServerConfig();
if (args
.length
== 1) {
config
.parse(args
[0]);
} else {
config
.parse(args
);
}
runFromConfig(config
);
}
public void runFromConfig(ServerConfig config
)
throws IOException
, AdminServerException
{
LOG
.info("Starting server");
FileTxnSnapLog txnLog
= null
;
try {
txnLog
= new FileTxnSnapLog(config
.dataLogDir
, config
.dataDir
);
final ZooKeeperServer zkServer
= new ZooKeeperServer(txnLog
,
config
.tickTime
, config
.minSessionTimeout
, config
.maxSessionTimeout
, null
);
final CountDownLatch shutdownLatch
= new CountDownLatch(1);
zkServer
.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch
));
adminServer
= AdminServerFactory
.createAdminServer();
adminServer
.setZooKeeperServer(zkServer
);
adminServer
.start();
boolean needStartZKServer
= true;
if (config
.getClientPortAddress() != null
) {
cnxnFactory
= ServerCnxnFactory
.createFactory();
cnxnFactory
.configure(config
.getClientPortAddress(), config
.getMaxClientCnxns(), false);
cnxnFactory
.startup(zkServer
);
needStartZKServer
= false;
}
if (config
.getSecureClientPortAddress() != null
) {
secureCnxnFactory
= ServerCnxnFactory
.createFactory();
secureCnxnFactory
.configure(config
.getSecureClientPortAddress(), config
.getMaxClientCnxns(), true);
secureCnxnFactory
.startup(zkServer
, needStartZKServer
);
}
containerManager
= new ContainerManager(zkServer
.getZKDatabase(), zkServer
.firstProcessor
,
Integer
.getInteger("znode.container.checkIntervalMs", (int) TimeUnit
.MINUTES
.toMillis(1)),
Integer
.getInteger("znode.container.maxPerMinute", 10000)
);
containerManager
.start();
shutdownLatch
.await();
shutdown();
if (cnxnFactory
!= null
) {
cnxnFactory
.join();
}
if (secureCnxnFactory
!= null
) {
secureCnxnFactory
.join();
}
if (zkServer
.canShutdown()) {
zkServer
.shutdown(true);
}
} catch (InterruptedException e
) {
LOG
.warn("Server interrupted", e
);
} finally {
if (txnLog
!= null
) {
txnLog
.close();
}
}
}
3.Leader选举
概念
外部投票:特指其他服务器发来的投票内部投票:服务器自身当前的投票选举轮次:ZooKeeper服务器Leader选举的轮次,即logical clock(逻辑时钟)Pk: 指对内部投票和外部投票进行一个对比来确定是否需要变更内部投票sendqueue:选票发送队列:保存待发送的投票recvqueue:选票接收队列,用于保存接收到的外部投票
//====================
服务器启动时期的Leader选举
1)每个Server发出一个投票
2)接受来自给各个服务器的投票
3)处理投票
优先检查ZXID. ZXID比较大的服务器优先作为Leader如果ZXID相同,那么就比较myid。 myId较大的服务器作为
4)统计投票
5)改变服务器状态
服务器运行时期的Leader的选举
1)变更状态
2)每个Server会发出一个投票
3) 接受来自各个服务器的投票,与启动时过程相同
4)处理投票
5)统计投票
6)改变服务器的状态
public interface Election {
public Vote
lookForLeader() throws InterruptedException
;
public void shutdown();
}
FastLeaderElection有三个重要的类
1.Notification
表示收到的选举投票信息(其他服务器发来的选举投票信息)
static public class Notification {
public final static int CURRENTVERSION
= 0x2;
int version
;
long leader
;
long zxid
;
long electionEpoch
;
QuorumPeer
.ServerState state
;
long sid
;
QuorumVerifier qv
;
long peerEpoch
;
}
2.ToSend
表示发送给其他服务器的选举投票信息
static public class ToSend {
static enum mType
{crequest
, challenge
, notification
, ack
}
ToSend(mType type
,
long leader
,
long zxid
,
long electionEpoch
,
ServerState state
,
long sid
,
long peerEpoch
,
byte[] configData
) {
this.leader
= leader
;
this.zxid
= zxid
;
this.electionEpoch
= electionEpoch
;
this.state
= state
;
this.sid
= sid
;
this.peerEpoch
= peerEpoch
;
this.configData
= configData
;
}
long leader
;
long zxid
;
long electionEpoch
;
QuorumPeer
.ServerState state
;
long sid
;
byte[] configData
= dummyData
;
long peerEpoch
;
}
LinkedBlockingQueue
<ToSend> sendqueue
;
LinkedBlockingQueue
<Notification> recvqueue
;
3.Messenger
protected class Messenger {
class WorkerReceiver extends ZooKeeperThread {
volatile boolean stop
;
QuorumCnxManager manager
;
WorkerReceiver(QuorumCnxManager manager
) {
super("WorkerReceiver");
this.stop
= false;
this.manager
= manager
;
}
public void run() {
Message response
;
while (!stop
) {
try {
response
= manager
.pollRecvQueue(3000, TimeUnit
.MILLISECONDS
);
if(response
== null
) continue;
if (response
.buffer
.capacity() < 28) {
LOG
.error("Got a short response: " + response
.buffer
.capacity());
continue;
}
boolean backCompatibility28
= (response
.buffer
.capacity() == 28);
boolean backCompatibility40
= (response
.buffer
.capacity() == 40);
response
.buffer
.clear();
Notification n
= new Notification();
int rstate
= response
.buffer
.getInt();
long rleader
= response
.buffer
.getLong();
long rzxid
= response
.buffer
.getLong();
long relectionEpoch
= response
.buffer
.getLong();
long rpeerepoch
;
int version
= 0x0;
if (!backCompatibility28
) {
rpeerepoch
= response
.buffer
.getLong();
if (!backCompatibility40
) {
version
= response
.buffer
.getInt();
} else {
LOG
.info("Backward compatibility mode (36 bits), server id: {}", response
.sid
);
}
} else {
LOG
.info("Backward compatibility mode (28 bits), server id: {}", response
.sid
);
rpeerepoch
= ZxidUtils
.getEpochFromZxid(rzxid
);
}
QuorumVerifier rqv
= null
;
if (version
> 0x1) {
int configLength
= response
.buffer
.getInt();
byte b
[] = new byte[configLength
];
response
.buffer
.get(b
);
synchronized(self
) {
try {
rqv
= self
.configFromString(new String(b
));
QuorumVerifier curQV
= self
.getQuorumVerifier();
if (rqv
.getVersion() > curQV
.getVersion()) {
LOG
.info("{} Received version: {} my version: {}", self
.getId(),
Long
.toHexString(rqv
.getVersion()),
Long
.toHexString(self
.getQuorumVerifier().getVersion()));
if (self
.getPeerState() == ServerState
.LOOKING
) {
LOG
.debug("Invoking processReconfig(), state: {}", self
.getServerState());
self
.processReconfig(rqv
, null
, null
, false);
if (!rqv
.equals(curQV
)) {
LOG
.info("restarting leader election");
self
.shuttingDownLE
= true;
self
.getElectionAlg().shutdown();
break;
}
} else {
LOG
.debug("Skip processReconfig(), state: {}", self
.getServerState());
}
}
} catch (IOException e
) {
LOG
.error("Something went wrong while processing config received from {}", response
.sid
);
} catch (ConfigException e
) {
LOG
.error("Something went wrong while processing config received from {}", response
.sid
);
}
}
} else {
LOG
.info("Backward compatibility mode (before reconfig), server id: {}", response
.sid
);
}
if(!validVoter(response
.sid
)) {
Vote current
= self
.getCurrentVote();
QuorumVerifier qv
= self
.getQuorumVerifier();
ToSend notmsg
= new ToSend(ToSend
.mType
.notification
,
current
.getId(),
current
.getZxid(),
logicalclock
.get(),
self
.getPeerState(),
response
.sid
,
current
.getPeerEpoch(),
qv
.toString().getBytes());
sendqueue
.offer(notmsg
);
} else {
if (LOG
.isDebugEnabled()) {
LOG
.debug("Receive new notification message. My id = "
+ self
.getId());
}
QuorumPeer
.ServerState ackstate
= QuorumPeer
.ServerState
.LOOKING
;
switch (rstate
) {
case 0:
ackstate
= QuorumPeer
.ServerState
.LOOKING
;
break;
case 1:
ackstate
= QuorumPeer
.ServerState
.FOLLOWING
;
break;
case 2:
ackstate
= QuorumPeer
.ServerState
.LEADING
;
break;
case 3:
ackstate
= QuorumPeer
.ServerState
.OBSERVING
;
break;
default:
continue;
}
n
.leader
= rleader
;
n
.zxid
= rzxid
;
n
.electionEpoch
= relectionEpoch
;
n
.state
= ackstate
;
n
.sid
= response
.sid
;
n
.peerEpoch
= rpeerepoch
;
n
.version
= version
;
n
.qv
= rqv
;
if(LOG
.isInfoEnabled()){
printNotification(n
);
}
if(self
.getPeerState() == QuorumPeer
.ServerState
.LOOKING
){
recvqueue
.offer(n
);
if((ackstate
== QuorumPeer
.ServerState
.LOOKING
)
&& (n
.electionEpoch
< logicalclock
.get())){
Vote v
= getVote();
QuorumVerifier qv
= self
.getQuorumVerifier();
ToSend notmsg
= new ToSend(ToSend
.mType
.notification
,
v
.getId(),
v
.getZxid(),
logicalclock
.get(),
self
.getPeerState(),
response
.sid
,
v
.getPeerEpoch(),
qv
.toString().getBytes());
sendqueue
.offer(notmsg
);
}
} else {
Vote current
= self
.getCurrentVote();
if(ackstate
== QuorumPeer
.ServerState
.LOOKING
){
if(LOG
.isDebugEnabled()){
LOG
.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
self
.getId(),
response
.sid
,
Long
.toHexString(current
.getZxid()),
current
.getId(),
Long
.toHexString(self
.getQuorumVerifier().getVersion()));
}
QuorumVerifier qv
= self
.getQuorumVerifier();
ToSend notmsg
= new ToSend(
ToSend
.mType
.notification
,
current
.getId(),
current
.getZxid(),
current
.getElectionEpoch(),
self
.getPeerState(),
response
.sid
,
current
.getPeerEpoch(),
qv
.toString().getBytes());
sendqueue
.offer(notmsg
);
}
}
}
} catch (InterruptedException e
) {
LOG
.warn("Interrupted Exception while waiting for new message" +
e
.toString());
}
}
LOG
.info("WorkerReceiver is down");
}
}
class WorkerSender extends ZooKeeperThread {
volatile boolean stop
;
QuorumCnxManager manager
;
WorkerSender(QuorumCnxManager manager
){
super("WorkerSender");
this.stop
= false;
this.manager
= manager
;
}
public void run() {
while (!stop
) {
try {
ToSend m
= sendqueue
.poll(3000, TimeUnit
.MILLISECONDS
);
if(m
== null
) continue;
process(m
);
} catch (InterruptedException e
) {
break;
}
}
LOG
.info("WorkerSender is down");
}
void process(ToSend m
) {
ByteBuffer requestBuffer
= buildMsg(m
.state
.ordinal(),
m
.leader
,
m
.zxid
,
m
.electionEpoch
,
m
.peerEpoch
,
m
.configData
);
manager
.toSend(m
.sid
, requestBuffer
);
}
}
WorkerSender ws
;
WorkerReceiver wr
;
Thread wsThread
= null
;
Thread wrThread
= null
;
Messenger(QuorumCnxManager manager
) {
this.ws
= new WorkerSender(manager
);
this.wsThread
= new Thread(this.ws
,
"WorkerSender[myid=" + self
.getId() + "]");
this.wsThread
.setDaemon(true);
this.wr
= new WorkerReceiver(manager
);
this.wrThread
= new Thread(this.wr
,
"WorkerReceiver[myid=" + self
.getId() + "]");
this.wrThread
.setDaemon(true);
}
void start(){
this.wsThread
.start();
this.wrThread
.start();
}
void halt(){
this.ws
.stop
= true;
this.wr
.stop
= true;
}
}
类属性
private static final Logger LOG
= LoggerFactory
.getLogger(FastLeaderElection
.class);
final static int finalizeWait
= 200;
final static int maxNotificationInterval
= 60000;
final static int IGNOREVALUE
= -1;
QuorumCnxManager manager
;
LinkedBlockingQueue
<ToSend> sendqueue
;
LinkedBlockingQueue
<Notification> recvqueue
;
QuorumPeer self
;
Messenger messenger
;
volatile long logicalclock
;
long proposedLeader
;
long proposedZxid
;
long proposedEpoch
;
volatile boolean stop
;
4.sendNotififications
private void sendNotifications() {
for (long sid
: self
.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv
= self
.getQuorumVerifier();
ToSend notmsg
= new ToSend(ToSend
.mType
.notification
,
proposedLeader
,
proposedZxid
,
logicalclock
.get(),
QuorumPeer
.ServerState
.LOOKING
,
sid
,
proposedEpoch
, qv
.toString().getBytes());
if(LOG
.isDebugEnabled()){
LOG
.debug("Sending Notification: " + proposedLeader
+ " (n.leader), 0x" +
Long
.toHexString(proposedZxid
) + " (n.zxid), 0x" + Long
.toHexString(logicalclock
.get()) +
" (n.round), " + sid
+ " (recipient), " + self
.getId() +
" (myid), 0x" + Long
.toHexString(proposedEpoch
) + " (n.peerEpoch)");
}
sendqueue
.offer(notmsg
);
}
}
5.totalOrderPredicate
该函数将接受的投票与自身投票进行pk,查看是否消息中包含的服务器id是否更优,其按照epoch,zxid,id的优先级进行pk
protected boolean totalOrderPredicate(long newId
, long newZxid
, long newEpoch
, long curId
, long curZxid
, long curEpoch
) {
LOG
.debug("id: " + newId
+ ", proposed id: " + curId
+ ", zxid: 0x" +
Long
.toHexString(newZxid
) + ", proposed zxid: 0x" + Long
.toHexString(curZxid
));
if(self
.getQuorumVerifier().getWeight(newId
) == 0){
return false;
}
return ((newEpoch
> curEpoch
) ||
((newEpoch
== curEpoch
) &&
((newZxid
> curZxid
) || ((newZxid
== curZxid
) && (newId
> curId
)))));
}
6.termPredicate
判断leader选举是否结束,是否有一半以上的服务器选出了leader
private boolean termPredicate(HashMap
<Long, Vote> votes
, Vote vote
) {
SyncedLearnerTracker voteSet
= new SyncedLearnerTracker();
voteSet
.addQuorumVerifier(self
.getQuorumVerifier());
if (self
.getLastSeenQuorumVerifier() != null
&& self
.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet
.addQuorumVerifier(self
.getLastSeenQuorumVerifier());
}
for (Map
.Entry
<Long, Vote> entry
: votes
.entrySet()) {
/ 将等于当前投票的项放⼊set
if (vote
.equals(entry
.getValue())) {
voteSet
.addAck(entry
.getKey());
}
}
return voteSet
.hasAllQuorums();
}
概念补充
1.zxid概念补充
ZooKeeper节点状态改变的每一个操作都将使节点接收到一个Zxid格式的时间戳,并且这个时间戳全局有序。也就是说,每个对节点的改变都将产生一个唯一的Zxid。
cZxid概念
对应为该节点的创建时间(Create)
mZxid概念
对应该节点的最近一次修改的时间(Mofify)
与其子节点无关
问题
jar包报错
要把项目中Build里面的jar包在项目中引入一下
作业的资料查找
https://segmentfault.com/a/1190000019670015
https://blog.csdn.net/sqh201030412/article/details/51446434?utm_medium=distribute.pc_relevant.none-task-blog-baidulandingword-1&spm=1001.2101.3001.4242
单词
subscribe 订阅,捐款 Access Control Lists,ACL 访问控制列表
Latch 插销,锁
election 选举
Notification 通知
extract通知
Verifier 检验人
vote 投票
Quorum 大多数
Predicate 断言,暗示
Epoch 时代
Interval 间隔,区间
propos 关于评论
logical逻辑
Policy 政策
Retry 停止重试
Exponential 指数
guarantee保证
tribute 奉献,致敬