本文承接上篇kafka-server端源码分析之Controller初始化,继续讲解Controller初始化过程中副本状态机与分区状态机的启动
副本状态机
kafka将副本分为7个状态,下图是状态之间的流转图
副本状态用ReplicaState接口表示,需要说下validPreviousStates方法,它表示合法的开始状态,以NewReplica为例,它只能由NonExistentReplica状态转换而来1
2
3
4case object NewReplica extends ReplicaState {
val state: Byte = 1
val validPreviousStates: Set[ReplicaState] = Set(NonExistentReplica)
}
而状态之间的转换,必将涉及到大量的更新操作,ReplicaStateMachine#doHandleStateChanges方法统一处理了状态转换
回过头来说replicaStateMachine.startup()方法,它主要是将在线的副本转换为OnlineReplica状态
1 | def startup() { |
初始化副本状态缓存
首先看initializeReplicaState的初始化,只要理解了controllerContext没有什么难度
该方法主要初始化了一个replicaState缓存,记录了每一个副本的状态,根据是否在线分为OnlineReplica和ReplicaDeletionIneligible状态1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private def initializeReplicaState() {
controllerContext.allPartitions.foreach { partition =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
replicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(partition, replicaId)
if (controllerContext.isReplicaOnline(replicaId, partition))
replicaState.put(partitionAndReplica, OnlineReplica)
else
// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
// This is required during controller failover since during controller failover a broker can go down,
// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}
初始化replicaState之后,handleStateChanges将所有存活的副本转换为OnlineReplica,此时正常的副本就是从OnlineReplica -> OnlineReplica1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = new Callbacks()): Unit = {
if (replicas.nonEmpty) {
try {
controllerBrokerRequestBatch.newBatch()
replicas.groupBy(_.replica).map { case (replicaId, replicas) =>
val partitions = replicas.map(_.topicPartition)
doHandleStateChanges(replicaId, partitions, targetState, callbacks)
}
// 发送ControllerChannelManager中积攒的请求
controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
} catch {
case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
}
}
}
doHandleStateChanges用于处理副本状态转换,此时我们只关注targetState是OnlineReplica的处理
1 | private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState, |
NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible状态都可以转换到OnlineReplica状态
NewReplica会检查本地缓存,没有就更新,而其他状态需要发送LeaderAndIsr请求同步broker之间的数据
至此副本状态机的启动结束了,LeaderAndIsr请求作为kafka最核心的一个请求会在后面单独的篇章解析。
分区状态机
分区状态机相比于副本状态机而言,状态个数只有4个,但是涉及到副本leader选举,状态流转的复杂度高很多
PartitionStateMachine的startup方法如下1
2
3
4
5def startup() {
// 初始化分区的state
initializePartitionState()
triggerOnlinePartitionStateChange()
}
初始化分区状态缓存
和副本状态机类似,initializePartitionState也是用一个partitionState初始化每个分区的状态
将缓存中所有分区分为3种初始化状态
- 有leader副本,并且在线,标记为OnlinePartition状态,不在线为OfflinePartition
- 没有leader标记分区为NewPartition状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private def initializePartitionState() {
for (topicPartition <- controllerContext.allPartitions) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
// 获取leader和isr信息
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
// else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
// leader存活就是OnlinePartition状态的分区,否则就是OfflinePartition
if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition))
// leader is alive
partitionState.put(topicPartition, OnlinePartition)
else
partitionState.put(topicPartition, OfflinePartition)
case None =>
// 没有leader为NewPartition状态
partitionState.put(topicPartition, NewPartition)
}
}
}
初始化之后partitionState,分区状态机会把OfflinePartition和NewPartition的分区转换为OnlinePartition状态,
broker正常运行的情况下,分区都是OnlinePartition状态,此时handleStateChanges不会执行
1 | def triggerOnlinePartitionStateChange() { |
分区leader选举
doHandleStateChanges主要是选举分区的leader副本,这里现将分区分为两类:
- 未初始化的分区(uninitializedPartitions):状态是NewPartition的分区
- 准备要选举leader副本的分区(partitionsToElectLeader):状态是OfflinePartition,OnlinePartition的分区
doHandleStateChanges主要是对这两类分区选举leader,并放到前面说的partitionState缓存中
注:注意前面传递过来的选举策略是OfflinePartitionLeaderElectionStrategy1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState,
partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = {
// 这里的处理和副本状态机一样
val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition))
val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState))
invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
targetState match {
case OnlinePartition =>
val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition) // 类型:Seq[TopicPartition]
val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition)
// 状态为NewPartition的分区处理
if (uninitializedPartitions.nonEmpty) {
// 初始化新分区的leader
val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
successfulInitializations.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
partitionState.put(partition, OnlinePartition)
}
}
// OfflinePartition,OnlinePartition副本中开始选举
if (partitionsToElectLeader.nonEmpty) {
// 根据选举策略(Strategy)选举leader副本
val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get)
successfulElections.foreach { partition =>
stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
// 更新分区状态为OnlinePartition
partitionState.put(partition, OnlinePartition)
}
}
}
}
下面说说这两类分区leader副本选举方式
NewPartition状态的分区选举leader副本
initializeLeaderAndIsrForPartitions方法是在为NewPartition状态的分区选举leader副本
代码看上去很长,但是一句话就可以概括:取存活副本的列表的第一个副本作为leader,写回到zk的state节点,更新本地缓存,并发送LeaderAndIsr请求同步给其他broker
1 | private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = { |
OfflinePartition/OnlinePartition状态的分区选举leader副本
electLeaderForPartitions方法用于OfflinePartition/OnlinePartition状态的所有分区选举leader副本
而每一个分区的的leader副本选举在doElectLeaderForPartitions方法实现,虽然代码很多,但核心还是选举leader副本,写回zk,更新本地缓存,并发送LeaderAndIsr请求同步给其他broker
分区leader会在不同情况下选举leader副本,因此有4种选举策略,此时根据前面传递过来的参数,选举策略为OfflinePartitionLeaderElectionStrategy1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy):
(Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception]) = {
// 先批量获取zk中.../partitions/xxx/state,即每个分区的state 数据
// 样例: {"controller_epoch":19,"leader":0,"version":1,"leader_epoch":57,"isr":[0,1,2]}
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap)
}
val failedElections = mutable.Map.empty[TopicPartition, Exception]
val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)]
// 主要是初始化leaderIsrAndControllerEpochPerPartition
getDataResponses.foreach { getDataResponse =>
// context 就是请求参数里的分区
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition) // 获取缓存中该分区的状态
if (getDataResponse.resultCode == Code.OK) {
// 解析成LeaderIsrAndControllerEpoch
val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
// 没获取到leaderIsrAndControllerEpoch,添加到failedElections集合里
if (leaderIsrAndControllerEpochOpt.isEmpty) {
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, exception)
}
leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get // 加个括号好看些 (partition -> leaderIsrAndControllerEpochOpt.get)
} else if (getDataResponse.resultCode == Code.NONODE) {
// 节点不存在
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, exception)
} else {
// 其他zk异常
failedElections.put(partition, getDataResponse.resultException.get)
}
}
// zk里的controllerEpoch是否比 本地缓存里的controllerEpoch大,大就说明有其他Controller已经被选举了,写到了zk的partition/state里
val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch
}
invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) =>
val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " +
s"already written by another controller. This probably means that the current controller $controllerId went through " +
s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
failedElections.put(partition, new StateChangeFailedException(failMsg))
}
// 全部分区都被新Controller更新了state,直接返回failedElections
if (validPartitionsForElection.isEmpty) {
return (Seq.empty, Seq.empty, failedElections.toMap)
}
val shuttingDownBrokers = controllerContext.shuttingDownBrokerIds.toSet
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
case OfflinePartitionLeaderElectionStrategy => // 初始化是用的是OfflinePartitionLeaderElectionStrategy(追参数传递)
// 注意这里的scala语法,partition是布尔分组器,并返回结果给外边的val变量
leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
case ReassignPartitionLeaderElectionStrategy => // 分区重分配时的选举算法
leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
case PreferredReplicaPartitionLeaderElectionStrategy =>
leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
case ControlledShutdownPartitionLeaderElectionStrategy =>
leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty }
}
// 没选举出leader的分区
partitionsWithoutLeaders.foreach { case (partition, _, _) =>
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
failedElections.put(partition, new StateChangeFailedException(failMsg))
}
// 分区和存活的副本形成一个集合
val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap
// 分区和选举后的isr形成一个集合
val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap
// 更新每个选举成功的分区,更新leaderAndIsr,controller epoch
val UpdateLeaderAndIsrResult(successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch)
// updatesToRetry是版本冲突而更新失败的分区
successfulUpdates.foreach { case (partition, leaderAndIsr) =>
val replicas = controllerContext.partitionReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
// zk更新成功,放入本地缓存
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
// 发送LeaderAndIsr请求
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
leaderIsrAndControllerEpoch, replicas, isNew = false)
}
(successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates) // 这里是选举失败和更新zk失败的合并
}
OfflinePartitionLeaderElectionStrategy策略的选举算法在leaderForOffline方法中实现
leaderForOffline选举
在选举过程中,受unclean.leader.election.enable配置的约束,该配置可以是topic级别,线上环境一般设置为false,否则会在非isr的副本中选举leader,造成数据不一致问题
1 | private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): |
而最终的leader选举算法在PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection方法内实现
选举算法
该选举算法也比较简单,找到第一个在isr列表,并且是存活的副本作为leader
如果没有,并且unclean.leader.election.enable=true,从所有副本中取第一个存活的副本作为leader
1 | /** |
小结
至此副本状态机和分区状态机的启动就算完成了,副本状态机与分区状态机的启动操作,都是先初始化了状态缓存,进行初始化的状态转换,里面做了更新ControllerContext,zk中的数据的操作,而分区状态机还需要为每个分区选举leader副本