本文承接上篇kafka-server端源码分析之Controller初始化 ,继续讲解Controller初始化过程中副本状态机与分区状态机的启动
副本状态机
kafka将副本分为7个状态,下图是状态之间的流转图
副本状态用ReplicaState接口表示,需要说下validPreviousStates方法,它表示合法的开始状态,以NewReplica为例,它只能由NonExistentReplica状态转换而来
1 2 3 4 case object NewReplica extends ReplicaState { val state: Byte = 1 val validPreviousStates: Set[ReplicaState] = Set(NonExistentReplica) }
而状态之间的转换,必将涉及到大量的更新操作,ReplicaStateMachine#doHandleStateChanges方法统一处理了状态转换
回过头来说replicaStateMachine.startup()方法,它主要是将在线的副本转换为OnlineReplica状态
1 2 3 4 5 6 7 def startup () { initializeReplicaState() handleStateChanges(controllerContext.allLiveReplicas().toSeq, OnlineReplica) }
初始化副本状态缓存
首先看initializeReplicaState的初始化,只要理解了controllerContext没有什么难度
该方法主要初始化了一个replicaState缓存,记录了每一个副本的状态,根据是否在线分为OnlineReplica和ReplicaDeletionIneligible状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private def initializeReplicaState () { controllerContext.allPartitions.foreach { partition => val replicas = controllerContext.partitionReplicaAssignment(partition) replicas.foreach { replicaId => val partitionAndReplica = PartitionAndReplica(partition, replicaId) if (controllerContext.isReplicaOnline(replicaId, partition)) replicaState.put(partitionAndReplica, OnlineReplica) else replicaState.put(partitionAndReplica, ReplicaDeletionIneligible) } } }
初始化replicaState之后,handleStateChanges将所有存活的副本转换为OnlineReplica,此时正常的副本就是从OnlineReplica -> OnlineReplica
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def handleStateChanges (replicas: Seq[PartitionAndReplica], targetState: ReplicaState, callbacks: Callbacks = new Callbacks() ): Unit = { if (replicas.nonEmpty) { try { controllerBrokerRequestBatch.newBatch() replicas.groupBy(_.replica).map { case (replicaId, replicas) => val partitions = replicas.map(_.topicPartition) doHandleStateChanges(replicaId, partitions, targetState, callbacks) } controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { case e: Throwable => error(s"Error while moving some replicas to $targetState state" , e) } } }
doHandleStateChanges用于处理副本状态转换,此时我们只关注targetState是OnlineReplica的处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private def doHandleStateChanges (replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState, callbacks: Callbacks) : Unit = { val replicas = partitions.map(partition => PartitionAndReplica(partition, replicaId)) replicas.foreach(replica => replicaState.getOrElseUpdate(replica, NonExistentReplica)) val (validReplicas, invalidReplicas) = replicas.partition(replica => isValidTransition(replica, targetState)) invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState)) targetState match { case OnlineReplica = > validReplicas.foreach { replica => val partition = replica.topicPartition replicaState (replica) match { case NewReplica = > val assignment = controllerContext.partitionReplicaAssignment(partition) if (!assignment.contains(replicaId)) { controllerContext.updatePartitionReplicaAssignment(partition, assignment :+ replicaId) } case _ = > controllerContext.partitionLeadershipInfo.get(partition) match { case Some (leaderIsrAndControllerEpoch) => controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId), replica.topicPartition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = false ) case None = > } } logSuccessfulTransition(replicaId, partition, replicaState(replica), OnlineReplica) replicaState.put(replica, OnlineReplica) } } }
NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible状态都可以转换到OnlineReplica状态
NewReplica会检查本地缓存,没有就更新,而其他状态需要发送LeaderAndIsr请求同步broker之间的数据
至此副本状态机的启动结束了,LeaderAndIsr请求作为kafka最核心的一个请求会在后面单独的篇章解析。
分区状态机
分区状态机相比于副本状态机而言,状态个数只有4个,但是涉及到副本leader选举,状态流转的复杂度高很多
PartitionStateMachine的startup方法如下
1 2 3 4 5 def startup () { initializePartitionState() triggerOnlinePartitionStateChange() }
初始化分区状态缓存
和副本状态机类似,initializePartitionState也是用一个partitionState初始化每个分区的状态
将缓存中所有分区分为3种初始化状态
有leader副本,并且在线,标记为OnlinePartition状态,不在线为OfflinePartition
没有leader标记分区为NewPartition状态
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private def initializePartitionState () { for (topicPartition <- controllerContext.allPartitions) { controllerContext.partitionLeadershipInfo.get(topicPartition) match { case Some (currentLeaderIsrAndEpoch) => if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition)) partitionState.put(topicPartition, OnlinePartition) else partitionState.put(topicPartition, OfflinePartition) case None = > partitionState.put(topicPartition, NewPartition) } } }
初始化之后partitionState,分区状态机会把OfflinePartition和NewPartition的分区转换为OnlinePartition状态,
broker正常运行的情况下,分区都是OnlinePartition状态,此时handleStateChanges不会执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def triggerOnlinePartitionStateChange () { val partitionsToTrigger = partitionState.filter { case (partition, partitionState) => !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic) && (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) }.keys.toSeq handleStateChanges(partitionsToTrigger, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) } def handleStateChanges (partitions: Seq[TopicPartition], targetState: PartitionState, partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None) : Unit = { if (partitions.nonEmpty) { try { controllerBrokerRequestBatch.newBatch() doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { case e: Throwable => error(s"Error while moving some partitions to $targetState state" , e) } } }
分区leader选举
doHandleStateChanges主要是选举分区的leader副本,这里现将分区分为两类:
未初始化的分区(uninitializedPartitions):状态是NewPartition的分区
准备要选举leader副本的分区(partitionsToElectLeader):状态是OfflinePartition,OnlinePartition的分区
doHandleStateChanges主要是对这两类分区选举leader,并放到前面说的partitionState缓存中
注:注意前面传递过来的选举策略是OfflinePartitionLeaderElectionStrategy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private def doHandleStateChanges (partitions: Seq[TopicPartition], targetState: PartitionState, partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]) : Unit = { val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition)) val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState)) invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState)) targetState match { case OnlinePartition = > val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition) val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition) if (uninitializedPartitions.nonEmpty) { val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions) successfulInitializations.foreach { partition => stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " + s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}" ) partitionState.put(partition, OnlinePartition) } } if (partitionsToElectLeader.nonEmpty) { val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get) successfulElections.foreach { partition => stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " + s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}" ) partitionState.put(partition, OnlinePartition) } } } }
下面说说这两类分区leader副本选举方式
NewPartition状态的分区选举leader副本
initializeLeaderAndIsrForPartitions方法是在为NewPartition状态的分区选举leader副本
代码看上去很长,但是一句话就可以概括:取存活副本的列表的第一个副本作为leader,写回到zk的state节点,更新本地缓存,并发送LeaderAndIsr请求同步给其他broker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 private def initializeLeaderAndIsrForPartitions (partitions: Seq[TopicPartition]) : Seq[TopicPartition] = { val successfulInitializations = mutable.Buffer.empty[TopicPartition] val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition)) val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) => val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition)) partition -> liveReplicasForPartition } val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty } partitionsWithoutLiveReplicas.foreach { case (partition, replicas) => val failMsg = s"Controller $controllerId epoch ${controllerContext.epoch} encountered error during state change of " + s"partition $partition from New to Online, assigned replicas are " + s"[${replicas.mkString(" ,")}], live brokers are [${controllerContext.liveBrokerIds}]. No assigned " + "replica is alive." logFailedStateChange(partition, NewPartition, OnlinePartition, new StateChangeFailedException (failMsg)) } val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) => val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) partition -> leaderIsrAndControllerEpoch }.toMap val createResponses = try { zkClient.createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs) } catch { case e: Exception => partitionsWithLiveReplicas.foreach { case (partition,_) => logFailedStateChange(partition, partitionState(partition), NewPartition, e) } Seq.empty } createResponses.foreach { createResponse => val code = createResponse.resultCode val partition = createResponse.ctx.get.asInstanceOf[TopicPartition] val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochs(partition) if (code == Code.OK) { controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(leaderIsrAndControllerEpoch.leaderAndIsr.isr, partition, leaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(partition), isNew = true ) successfulInitializations += partition } else { logFailedStateChange(partition, NewPartition, OnlinePartition, code) } } successfulInitializations }
OfflinePartition/OnlinePartition状态的分区选举leader副本
electLeaderForPartitions方法用于OfflinePartition/OnlinePartition状态的所有分区选举leader副本
而每一个分区的的leader副本选举在doElectLeaderForPartitions方法实现,虽然代码很多,但核心还是选举leader副本,写回zk,更新本地缓存,并发送LeaderAndIsr请求同步给其他broker
分区leader会在不同情况下选举leader副本,因此有4种选举策略,此时根据前面传递过来的参数,选举策略为OfflinePartitionLeaderElectionStrategy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 private def doElectLeaderForPartitions (partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy) : (Seq[TopicPartition], Seq[TopicPartition], Map[TopicPartition, Exception]) = { val getDataResponses = try { zkClient.getTopicPartitionStatesRaw(partitions) } catch { case e: Exception => return (Seq.empty, Seq.empty, partitions.map(_ -> e).toMap) } val failedElections = mutable.Map.empty[TopicPartition, Exception] val leaderIsrAndControllerEpochPerPartition = mutable.Buffer.empty[(TopicPartition, LeaderIsrAndControllerEpoch)] getDataResponses.foreach { getDataResponse => val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition] val currState = partitionState(partition) if (getDataResponse.resultCode == Code.OK) { val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) if (leaderIsrAndControllerEpochOpt.isEmpty) { val exception = new StateChangeFailedException (s"LeaderAndIsr information doesn't exist for partition $partition in $currState state" ) failedElections.put(partition, exception) } leaderIsrAndControllerEpochPerPartition += partition -> leaderIsrAndControllerEpochOpt.get } else if (getDataResponse.resultCode == Code.NONODE) { val exception = new StateChangeFailedException (s"LeaderAndIsr information doesn't exist for partition $partition in $currState state" ) failedElections.put(partition, exception) } else { failedElections.put(partition, getDataResponse.resultException.get) } } val (invalidPartitionsForElection, validPartitionsForElection) = leaderIsrAndControllerEpochPerPartition.partition { case (_, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch } invalidPartitionsForElection.foreach { case (partition, leaderIsrAndControllerEpoch) => val failMsg = s"aborted leader election for partition $partition since the LeaderAndIsr path was " + s"already written by another controller. This probably means that the current controller $controllerId went through " + s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}." failedElections.put(partition, new StateChangeFailedException (failMsg)) } if (validPartitionsForElection.isEmpty) { return (Seq.empty, Seq.empty, failedElections.toMap) } val shuttingDownBrokers = controllerContext.shuttingDownBrokerIds.toSet val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match { case OfflinePartitionLeaderElectionStrategy = > leaderForOffline(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty } case ReassignPartitionLeaderElectionStrategy = > leaderForReassign(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty } case PreferredReplicaPartitionLeaderElectionStrategy = > leaderForPreferredReplica(validPartitionsForElection).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty } case ControlledShutdownPartitionLeaderElectionStrategy = > leaderForControlledShutdown(validPartitionsForElection, shuttingDownBrokers).partition { case (_, newLeaderAndIsrOpt, _) => newLeaderAndIsrOpt.isEmpty } } partitionsWithoutLeaders.foreach { case (partition, _, _) => val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy" failedElections.put(partition, new StateChangeFailedException (failMsg)) } val recipientsPerPartition = partitionsWithLeaders.map { case (partition, _, recipients) => partition -> recipients }.toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map { case (partition, leaderAndIsrOpt, _) => partition -> leaderAndIsrOpt.get }.toMap val UpdateLeaderAndIsrResult (successfulUpdates, updatesToRetry, failedUpdates) = zkClient.updateLeaderAndIsr( adjustedLeaderAndIsrs, controllerContext.epoch) successfulUpdates.foreach { case (partition, leaderAndIsr) => val replicas = controllerContext.partitionReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch) controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition, leaderIsrAndControllerEpoch, replicas, isNew = false ) } (successfulUpdates.keys.toSeq, updatesToRetry, failedElections.toMap ++ failedUpdates) }
OfflinePartitionLeaderElectionStrategy策略的选举算法在leaderForOffline方法中实现
leaderForOffline选举
在选举过程中,受unclean.leader.election.enable配置的约束,该配置可以是topic级别,线上环境一般设置为false,否则会在非isr的副本中选举leader,造成数据不一致问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 private def leaderForOffline (leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch) ]): Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = { val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) => val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) liveInSyncReplicas.isEmpty } val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals()) val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => if (failed.contains(partition.topic)) { logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic)) (partition, None, false ) } else { (partition, Option(leaderIsrAndControllerEpoch), logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()) } } ++ partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => (partition, Option(leaderIsrAndControllerEpoch), false ) } partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) => val assignment = controllerContext.partitionReplicaAssignment(partition) val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr) } (partition, newLeaderAndIsrOpt, liveReplicas) } else { (partition, None, liveReplicas) } } }
而最终的leader选举算法在PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection方法内实现
选举算法
该选举算法也比较简单,找到第一个在isr列表,并且是存活的副本作为leader
如果没有,并且unclean.leader.election.enable=true,从所有副本中取第一个存活的副本作为leader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def offlinePartitionLeaderElection (assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext) : Option[Int] = { assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse { if (uncleanLeaderElectionEnabled) { val leaderOpt = assignment.find(liveReplicas.contains) if (!leaderOpt.isEmpty) controllerContext.stats.uncleanLeaderElectionRate.mark() leaderOpt } else { None } } }
小结
至此副本状态机和分区状态机的启动就算完成了,副本状态机与分区状态机的启动操作,都是先初始化了状态缓存,进行初始化的状态转换,里面做了更新ControllerContext,zk中的数据的操作,而分区状态机还需要为每个分区选举leader副本