val newBrokerIdsSorted = newBrokerIds.toSeq.sorted val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " + s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
private def onBrokerStartup(newBrokers: Seq[Int]){ info(s"New broker startup callback for ${newBrokers.mkString(",")}") newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val newBrokersSet = newBrokers.toSet // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new // broker via this update. // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the // common controlled shutdown case, the metadata will reach the new brokers faster sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica) // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // check if reassignment of some partitions need to be restarted val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter { case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains) } partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) } // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists // on the newly restarted brokers, there is a chance that topic deletion can resume val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) if (replicasForTopicsToBeDeleted.nonEmpty) { info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " + s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " + s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics") topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic)) } registerBrokerModificationsHandler(newBrokers) }
private def onBrokerFailure(deadBrokers: Seq[Int]){ info(s"Broker failure callback for ${deadBrokers.mkString(",")}") // 移除缓存中下线的broker上的分区 deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val deadBrokersThatWereShuttingDown = deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id)) info(s"Removed $deadBrokersThatWereShuttingDown from list of shutting down brokers.") // 下线broker上的副本 val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas // 标记这些分区为OfflinePartition状态 partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition) // 用这些剩余分区剩余的副本选举leader // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // trigger OfflineReplica state change for those newly offline replicas // dead broker上的副本置为Offline replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
// broker已下线,删除失败 // fail deletion of topics that are affected by the offline replicas if (newOfflineReplicasForDeletion.nonEmpty) { // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be // deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) }
// If replica failure did not require leader re-election, inform brokers of the offline replica // Note that during leader re-election, brokers update their metadata // 如果dead broker上没有分区的leader副本,就发送UpdateMetadataRequest给活着的broker if (partitionsWithoutLeader.isEmpty) { sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) } }