Zookeeper 快速领导者选举原理

本文由 简悦 SimpRead 转码, 原文地址 mp.weixin.qq.com

人类选举的基本原理

正常情况下,选举是一定要投票的。

我们应该都经历过投票,在投票时我们可能会将票投给和我们关系比较好的人,如果你和几个候选人都比较熟,这种情况下你会将选票投给你认为能力比较强的人,如果你和几个候选人都不熟,并且你自己也是候选人的话,这时你应该会认为你是这些候选人里面最厉害的那个人,大家都应该选你,这时你就会去和别人交流以获得别人的投票,但是很有可能在交流的过程中,你发现了比你更厉害的人,这时你如果脸皮不是那么厚的话,你应该会改变你的决定,去投你觉得更厉害的人,最终你将得到在你心中认为最厉害的人,且将票投给他,选票将会放在投票中,最后从投票箱中进行统计,获得票数最多的人当选。

在这样一个选举过程中我们提炼出四个基本概念:

  1. 个人能力:投我认为能力最强的人,这是投票的基本规则

  2. 改票:能力最强的人是逐渐和其他人沟通之后的结果,类似改票,先投给 A,但是后来发现 B 更厉害,则改为投 B

  3. 投票箱:所有人公用一个投票箱

  4. 领导者:获得投票数最多的人为领导者

Zookeeper 选举的基本原理

Zookeeper 集群模式下才需要选举。

Zookeeper 的选举和人类的选举逻辑类似,Zookeeper 需要实现上面人类选举的四个基本概念;

  1. 个人能力:Zookeeper 是一个数据库,集群中节点的数据越新就代表此节点能力越强,而在 Zookeeper 中可以通事务 id(zxid) 来表示数据的新旧,一个节点最新的 zxid 越大则该节点的数据越新。所以 Zookeeper 选举时会根据 zxid 的大小来作为投票的基本规则。

  2. 改票:Zookeeper 集群中的某一个节点在开始进行选举时,首先认为自己的数据是最新的,会先投自己一票,并且把这张选票发送给其他服务器,这张选票里包含了两个重要信息:zxidsid,sid 表示这张选票投的服务器 id,zxid 表示这张选票投的服务器上最大的事务 id,同时也会接收到其他服务器的选票,接收到其他服务器的选票后,可以根据选票信息中的 zxid 来与自己当前所投的服务器上的最大 zxid 来进行比较,如果其他服务器的选票中的 zxid 较大,则表示自己当前所投的机器数据没有接收到的选票所投的服务器上的数据新,所以本节点需要改票,改成投给和刚刚接收到的选票一样。

  3. 投票箱:Zookeeper 集群中会有很多节点,和人类选举不一样,Zookeeper 集群并不会单独去维护一个投票箱应用,而是在每个节点内存里利用一个数组来作为投票箱。每个节点里都有一个投票箱,节点会将自己的选票以及从其他服务器接收到的选票放在这个投票箱中。因为集群节点是相互交互的,并且选票的 PK 规则是一致的,所以每个节点里的这个投票箱所存储的选票都会是一样的,这样也可以达到公用一个投票箱的目的。

  4. 领导者:Zookeeper 集群中的每个节点,开始进行领导选举后,会不断的接收其他节点的选票,然后进行选票 PK,将自己的选票修改为投给数据最新的节点,这样就保证了,每个节点自己的选票代表的都是自己暂时所认为的数据最新的节点,再因为其他服务器的选票都会存储在投票箱内,所以可以根据投票箱里去统计是否有超过一半的选票和自己选择的是同一个节点,都认为这个节点的数据最新,一旦整个集群里超过一半的节点都认为某一个节点上的数据最新,则该节点就是领导者。

通过对四个概念的在 Zookeeper 中的解析,也同时介绍了一下 Zookeeper 领导者选举的基本原理,只是说选举过程中还有更多的细节需要我们了解,下面我结合源码来给大家详细的分析一下 Zookeeper 的快速领导者选举原理。

领导者选举入口

ZooKeeperServer 表示单机模式中的一个 zkServer。QuoruPeer 表示集群模式中的一个 zkServer。QuoruPeer 类定义如下:

1
public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider

定义表明 QuorumPeer 是一个 ZooKeeperThread,表示是一个线程。当集群中的某一个台 zkServer 启动时 QuorumPeer 类的 start 方法将被调用。

1
2
3
4
5
6
public synchronized void start() {
loadDataBase(); // 1
cnxnFactory.start(); // 2
startLeaderElection(); // 3
super.start(); // 4
}
  1. zkServer 中有一个内存数据库对象 ZKDatabase, zkServer 在启动时需要将已被持久化的数据加载进内存中,也就是加载至 ZKDatabase。

  2. 这一步会开启一个线程来接收客户端请求,但是需要注意,这一步执行完后虽然成功开启了一个线程,并且也可以接收客户端线程,但是因为现在 zkServer 还没有经过初始化,实际上把请求拒绝掉,知道 zkServer 初始化完成才能正常的接收请求。

  3. 这个方法名很有误导性,这个方法并没有真正的开始领导选举,而是进行一些初始化

  4. 继续启动,包括进行领导者选举、zkServer 初始化。

领导者选举策略

上文 QuorumPeer 类的 startLeaderElection 会进行领导者选举初始化。首先,领导者选举在 Zookeeper 中有 3 种实现:其中 LeaderElection、AuthFastLeaderElection 已经被标为过期,不建议使用,所以现在用的都是快速领导者选举 FastLeaderElection,我们着重来介绍 FastLeaderElection。

快速领导者选举

快速领导者选举实现架构如下图:

传输层初始化

从架构图我们可以发现,快速领导者选举实现架构分为两层:应用层传输层。所以初始化核心就是初始化传输层。

初始化步骤:

  1. 初始化 QuorumCnxManager

  2. 初始化 QuorumCnxManager.Listener

  3. 运行 QuorumCnxManager.Listener

  4. 运行 QuorumCnxManager

  5. 返回 FastLeaderElection 对象

QuorumCnxManager 介绍

QuorumCnxManager 就是传输层实现,QuorumCnxManager 中几个重要的属性:

  • ConcurrentHashMap<long, arrayblockingqueue

    queueSendMap

  • ConcurrentHashMap

    senderWorkerMap

  • ArrayBlockingQueue

    recvQueue

  • QuorumCnxManager.Listener

传输层的每个 zkServer 需要发送选票信息给其他服务器,这些选票信息来至应用层,在传输层中将会按服务器 id 分组保存在 queueSendMap 中。

传输层的每个 zkServer 需要发送选票信息给其他服务器,SendWorker 就是封装了 Socket 的发送器,而 senderWorkerMap 就是用来记录其他服务器 id 以及对应的 SendWorker 的。

传输层的每个 zkServer 将接收其他服务器发送的选票信息,这些选票会保存在 recvQueue 中,以提供给应用层使用。

QuorumCnxManager.Listener 负责开启 socket 监听。

细化后的架构图如下:

服务器之间连接问题

在集群启动时,一台服务器需要去连另外一台服务器,从而建立 Socket 用来进行选票传输。那么如果现在 A 服务器去连 B 服务器,同时 B 服务器也去连 A 服务器,那么就会导致建立了两条 Socket,我们知道 Socket 是双向的,Socket 的双方是可以相互发送和接收数据的,那么现在 A、B 两台服务器建立两条 Socket 是没有意义的,所以 ZooKeeper 在实现时做了限制,只允许服务器 ID 较大者去连服务器 ID 较小者,小 ID 服务器去连大 ID 服务器会被拒绝,伪代码如下

1
2
3
4
5
6
if (对方服务器id < 本服务器id) {
closeSocket(sock); // 关闭这条socket
connectOne(sid); // 由本服务器去连对方服务器
} else {
// 继续建立连接
}

SendWorker、RecvWorker 介绍

上文介绍到了 SendWorker,它是 zkServer 用来向其他服务器发送选票信息的。类结构如下:

1
2
3
4
5
6
7
class SendWorker extends ZooKeeperThread {
Long sid;
Socket sock;
RecvWorker recvWorker;
volatile boolean running = true;
DataOutputStream dout;
}

它封装了 socket 并且是一个线程,实际上 SendWorker 的底层实现是:SendWorker 线程会不停的从 queueSendMap 中获取选票信息然后发送到 Socket 上。基于同样的思路,我们还需要一个线程从 Socket 上获取数据然后添加到 recvQueue 中,这就是 RecvWorker 的功能。

所以架构可以演化为下图,通过这个架构,选举应用层直接从 recvQueue 中获取选票,或者选票添加到 queueSendMap 中既可以完成选票发送:

应用层初始化

FastLeaderElection 类介绍

FastLeaderElection 类是快速领导者选举实现的核心类,这个类有三个重要的属性:

  • LinkedBlockingQueue<ToSend> sendqueue;

  • LinkedBlockingQueue<Notification> recvqueue;

  • Messenger messenger;

  • Messenger.WorkerSender

  • Messenger.WorkerReceiver

服务器在进行领导者选举时,在发送选票时也会同时接受其他服务器的选票,FastLeaderElection 类也提供了和传输层类似的实现,将待发送的选票放在 sendqueue 中,由 Messenger.WorkerSender 发送到传输层 queueSendMap 中。同样,由 Messenger.WorkerReceiver 负责从传输层获取数据并放入 recvqueue 中。

这样在应用层了,只需要将待发送的选票信息添加到 sendqueue 中即可完成选票信息发送,或者从 recvqueue 中获取元素即可得到选票信息。

在构造 FastLeaderElection 对象时,会对 sendqueue、recvqueue 队列进行初始化,并且运行 Messenger.WorkerSenderMessenger.WorkerReceiver 线程。

此时架构图如下:

到这里,QuorumPeer 类的 startLeaderElection 方法已经执行完成,完成了传输层和应用层的初始化。

快速领导者选举实现

QuorumPeer 类的 start 方法前三步分析完,接下来我们来看看第四步:

1
super.start();

QuorumPeer 类是一个 ZooKeeperThread 线程,上述代码实际就是运行一个线程,相当于运行 QuorumPeer 类中的 run 方法,这个方法也是集群模式下 Zkserver 启动最核心的方法。

总结一下 QuorumPeer 类的 start 方法:

  1. 加载持久化数据到内存

  2. 初始化领导者选举策略

  3. 初始化快速领导者选举传输层

  4. 初始化快速领导者选举应用层

  5. 开启主线程

主线程开启之后,QuorumPeer 类的 start 方法即执行完成,这时回到上层代码可以看到主线程会被 join 住:

1
2
quorumPeer.start(); // 开启线程
quorumPeer.join(); // join线程

接下来我们着重来分析一下主线程内的逻辑。

主线程

在主线程里,会有一个主循环 (Main loop),主循环伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
while (服务是否正在运行) {
switch (当前服务器状态) {
case LOOKING:
// 领导者选举
setCurrentVote(makeLEStrategy().lookForLeader());
break;
case OBSERVING:
try {
// 初始化为观察者
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING:
try {
// 初始化为跟随者
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setPeerState(ServerState.LOOKING);
}
break;
case LEADING:
try {
// 初始化为领导者
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
leader.shutdown("Forcing shutdown");
setPeerState(ServerState.LOOKING);
}
break;
}
}

这个伪代码实际上非常非常重要,大家细心的多看几遍。** 根据伪代码可以看到,当服务器状态为 LOOKING 时会进行领导者选举,所以我们着重来看领导者选举。

lookForLeader

当服务器状态为 LOOKING 时会调用 FastLeaderElection 类的 lookForLeader 方法,这就是领导者选举的应用层。

1. 初始化一个投票箱
1
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
2. 更新选票,将票投给自己
1
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
3. 发送选票
1
sendNotifications();
4. 不断获取其他服务器的投票信息,直到选出 Leader
1
2
3
4
5
6
7
8
9
10
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){
// 从recvqueue中获取接收到的投票信息
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

if (获得的投票为空) {
// 连接其他服务器
} else {
// 处理投票
}
}
5. 连接其他服务器

因为在这一步之前,都只进行了服务器的初始化,并没有真正的去与其他服务器建立连接,所以在这里建立连接。

6. 处理投票

判断接收到的投票所对应的服务器的状态,也就是投此票的服务器的状态:

1
2
3
4
5
6
7
8
9
10
11
switch (n.state) {
case LOOKING:
// PK选票、过半机制验证等
break;
case OBSERVING:
// 观察者节点不应该发起投票,直接忽略
break;
case FOLLOWING:
case LEADING:
// 如果接收到跟随者或领导者节点的选票,则可以认为当前集群已经存在Leader了,直接return,退出lookForLeader方法。
}
7. PK 选票
1
2
3
4
5
6
7
8
9
10
11
if (接收到的投票的选举周期 > 本服务器当前的选举周期) {
// 修改本服务器的选举周期为接收到的投票的选举周期
// 清空本服务器的投票箱(表示选举周期落后,重新开始投票)
// 比较接收到的选票所选择的服务器与本服务器的数据谁更新,本服务器将选票投给数据较新者
// 发送选票
} else if(接收到的投票的选举周期 < 本服务器当前的选举周期){
// 接收到的投票的选举周期落后了,本服务器直接忽略此投票
} else if(选举周期一致) {
// 比较接收到的选票所选择的服务器与本服务器当前所选择的服务器的数据谁更新,本服务器将选票投给数据较新者
// 发送选票
}
8. 过半机制验证

本服务器的选票经过不停的 PK 会将票投给数据更新的服务器,PK 完后,将接收到的选票以及本服务器自己所投的选票放入投票箱中,然后从投票箱中统计出与本服务器当前所投服务器一致的选票数量,判断该选票数量是否超过集群中所有跟随者的一半(选票数量 > 跟随者数量 / 2),如果满足这个过半机制就选出了一个准 Leader。**

9. 最终确认

选出准 Leader 之后,再去获取其他服务器的选票,如果获取到的选票所代表的服务器的数据比准 Leader 更新,则准 Leader 卸职,继续选举。如果没有准 Leader 更新,则继续获取投票,直到没有获取到选票,则选出了最终的 Leader。Leader 确定后,其他服务器的角色也确定好了。

领导选举完成后

上文主线程小节有一段非常重要的伪代码,这段伪代码达到了一个非常重要的功能,就是:

ZooKeeper 集群在进行领导者选举的过程中不能对外提供服务

根据伪代码我们可以发现,只有当集群中服务器的角色确定了之后,while 才会进行下一次循环,当进入下一次循环后,就会根据服务器的角色进入到对应的初始化逻辑,初始化完成之后才能对外提供服务。