private def onBrokerStartup(newBrokers: Seq[Int]) { info(s"New broker startup callback for ${newBrokers.mkString(",")}") newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) valnewBrokersSet= newBrokers.toSet // send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new // broker via this update. // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the // common controlled shutdown case, the metadata will reach the new brokers faster sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions valallReplicasOnNewBrokers= controllerContext.replicasOnBrokers(newBrokersSet) replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers.toSeq, OnlineReplica) // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions // to see if these brokers can become leaders for some/all of those partitionStateMachine.triggerOnlinePartitionStateChange() // check if reassignment of some partitions need to be restarted valpartitionsWithReplicasOnNewBrokers= controllerContext.partitionsBeingReassigned.filter { case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains) } partitionsWithReplicasOnNewBrokers.foreach { case (tp, context) => onPartitionReassignment(tp, context) } // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists // on the newly restarted brokers, there is a chance that topic deletion can resume valreplicasForTopicsToBeDeleted= allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) if (replicasForTopicsToBeDeleted.nonEmpty) { info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " + s"${topicDeletionManager.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " + s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics") topicDeletionManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic)) } registerBrokerModificationsHandler(newBrokers) }
// trigger OfflinePartition state for all partitions whose current leader is one amongst the newOfflineReplicas // 标记这些分区为OfflinePartition状态 partitionStateMachine.handleStateChanges(partitionsWithoutLeader.toSeq, OfflinePartition) // 用这些剩余分区剩余的副本选举leader // trigger OnlinePartition state changes for offline or new partitions partitionStateMachine.triggerOnlinePartitionStateChange() // trigger OfflineReplica state change for those newly offline replicas // dead broker上的副本置为Offline replicaStateMachine.handleStateChanges(newOfflineReplicasNotForDeletion.toSeq, OfflineReplica)
// broker已下线,删除失败 // fail deletion of topics that are affected by the offline replicas if (newOfflineReplicasForDeletion.nonEmpty) { // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be // deleted when its log directory is offline. This will prevent the replica from being in TopicDeletionStarted state indefinitely // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) }
// If replica failure did not require leader re-election, inform brokers of the offline replica // Note that during leader re-election, brokers update their metadata // 如果dead broker上没有分区的leader副本,就发送UpdateMetadataRequest给活着的broker if (partitionsWithoutLeader.isEmpty) { sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) } }