// ControllerId=-1,表示当前broker已成为Controller,属于特殊场景下的防止死循环优化 if (activeControllerId != -1) { debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.") return }
try { // 尝试去创建/controller节点,如果创建失败了(已存在),会在catch里处理NodeExistsException zkClient.checkedEphemeralCreate(ControllerZNode.path, ControllerZNode.encode(config.brokerId, timestamp)) info(s"${config.brokerId} successfully elected as the controller") activeControllerId = config.brokerId onControllerFailover() } catch { case _: NodeExistsException => // If someone else has written the path, then activeControllerId = zkClient.getControllerId.getOrElse(-1)
// 如果/controller已存在, brokerid就不会是-1 // {"version":1,"brokerid":0,"timestamp":"1582610063256"} if (activeControllerId != -1) debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}") else // 上一届controller刚下台,节点还没删除的情况 warn("A controller has been elected but just resigned, this will result in another round of election")
case e2: Throwable => error(s"Error while electing or becoming controller on broker ${config.brokerId}", e2) triggerControllerMove() } }
info("Fetching topic deletions in progress") val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() info("Initializing topic deletion manager") topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). info("Sending update metadata request") sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
info("Starting the controller scheduler") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) }
if (config.tokenAuthEnabled) { info("starting the token expiry check scheduler") tokenCleanScheduler.startup() tokenCleanScheduler.schedule(name = "delete-expired-tokens", fun = tokenManager.expireTokens, period = config.delegationTokenExpiryCheckIntervalMs, unit = TimeUnit.MILLISECONDS) } }
/** * 这一段代码就是获取controller.epoch,并自增+1设置回zk */ info("Reading controller epoch from ZooKeeper") // 获取/controller_epoch节点数据,初始化ControllerContext的epoch和epochZkVersion字段 readControllerEpochFromZooKeeper() info("Incrementing controller epoch in ZooKeeper") incrementControllerEpoch() info("Registering handlers")
注册节点监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/** * 注册一组childrenChangeHandler,在NodeChildrenChange事件触发后,会分发给这些handler */ // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks valchildChangeHandlers= Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler) childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) // 注册/admin/preferred_replica_election, /admin/reassign_partitions节点事件处理 // 也是注册,不过要检查节点是否存在(这里不对是否存在做处理,只是保证没有异常) valnodeChangeHandlers= Seq(preferredReplicaElectionHandler, partitionReassignmentHandler) nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)
// controller epoch 在kafka中epoch就相当于乐观锁的version var epoch: Int = KafkaController.InitialControllerEpoch - 1 // 这是zk自带的version,通用的用于更新节点数据 var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
private def initializePartitionReassignment() { // read the partitions being reassigned from zookeeper path /admin/reassign_partitions valpartitionsBeingReassigned= zkClient.getPartitionReassignment info(s"Partitions being reassigned: $partitionsBeingReassigned")
// We need to send UpdateMetadataRequest after the controller context is initialized and before the state machines // are started. The is because brokers need to receive the list of live brokers from UpdateMetadataRequest before // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). // 在处理LeaderAndIsrRequest请求之前,先更新所有broker以及所有partition的元数据 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)