override def process(): Unit = { if (!isActive) return
// We need to register the watcher if the path doesn't exist in order to detect future reassignments and we get // the `path exists` check for free // 注册 partitionReassignmentHandler if (zkClient.registerZNodeChangeHandlerAndCheckExistence(partitionReassignmentHandler)) { // 获取重分配方案 valpartitionReassignment= zkClient.getPartitionReassignment
// Populate `partitionsBeingReassigned` with all partitions being reassigned before invoking // `maybeTriggerPartitionReassignment` (see method documentation for the reason) partitionReassignment.foreach { case (tp, newReplicas) => // 重分配引起的isr改变事件监听 valreassignIsrChangeHandler=newPartitionReassignmentIsrChangeHandler(KafkaController.this, eventManager, tp) // 重分配缓存,ReassignedPartitionsContext:重分配的新副本,isr监听处理器 controllerContext.partitionsBeingReassigned.put(tp, ReassignedPartitionsContext(newReplicas, reassignIsrChangeHandler)) }
topicPartitions.foreach { tp => if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) { error(s"Skipping reassignment of $tp since the topic is currently being deleted") partitionsToBeRemovedFromReassignment.add(tp) } else { valreassignedPartitionContext= controllerContext.partitionsBeingReassigned.get(tp).getOrElse { // 防止partitionsBeingReassigned被改变(加锁不更好吗) thrownewIllegalStateException(s"Initiating reassign replicas for partition $tp not present in " + s"partitionsBeingReassigned: ${controllerContext.partitionsBeingReassigned.mkString(", ")}") } valnewReplicas= reassignedPartitionContext.newReplicas valtopic= tp.topic valassignedReplicas= controllerContext.partitionReplicaAssignment(tp) if (assignedReplicas.nonEmpty) { if (assignedReplicas == newReplicas) { info(s"Partition $tp to be reassigned is already assigned to replicas " + s"${newReplicas.mkString(",")}. Ignoring request for partition reassignment.") partitionsToBeRemovedFromReassignment.add(tp) } else { try { info(s"Handling reassignment of partition $tp to new replicas ${newReplicas.mkString(",")}") // first register ISR change listener // 注册PartitionReassignmentIsrChangeHandler reassignedPartitionContext.registerReassignIsrChangeHandler(zkClient) // mark topic ineligible for deletion for the partitions being reassigned // 标记为删除失败 topicDeletionManager.markTopicIneligibleForDeletion(Set(topic)) // 分区副本重分配 onPartitionReassignment(tp, reassignedPartitionContext) } catch { case e: Throwable => error(s"Error completing reassignment of partition $tp", e) // remove the partition from the admin path to unblock the admin client partitionsToBeRemovedFromReassignment.add(tp) } } } else { error(s"Ignoring request to reassign partition $tp that doesn't exist.") partitionsToBeRemovedFromReassignment.add(tp) } } } removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment) }
private def onPartitionReassignment(topicPartition: TopicPartition, reassignedPartitionContext: ReassignedPartitionsContext) { valreassignedReplicas= reassignedPartitionContext.newReplicas // 是否所有要分配的副本都在isr中 if (!areReplicasInIsr(topicPartition, reassignedReplicas)) { // 说明不是所有的副本都在isr里 info(s"New replicas ${reassignedReplicas.mkString(",")} for partition $topicPartition being reassigned not yet " + "caught up with the leader") // 即将要分配的 减去 之前已分配(缓存里) valnewReplicasNotInOldReplicaList= reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicPartition).toSet // 新的 + 老的 (会去重) = 全部的 valnewAndOldReplicas= (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicPartition)).toSet //1. Update AR in ZK with OAR + RAR. //1. 更新reassign之后的全量副本到 /brokers/topics/topic节点 updateAssignedReplicasForPartition(topicPartition, newAndOldReplicas.toSeq) //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). //3. 发送LeaderAndIsr请求 TODO 这里缓存里的ReplicaAssignment应该等于newAndOldReplicas的, 但是replica也是brokerId updateLeaderEpochAndSendRequest(topicPartition, controllerContext.partitionReplicaAssignment(topicPartition), newAndOldReplicas.toSeq) //3. replicas in RAR - OAR -> NewReplica //3. 新增的副本转为NewReplica状态 startNewReplicasForReassignedPartition(topicPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) info(s"Waiting for new replicas ${reassignedReplicas.mkString(",")} for partition ${topicPartition} being " + "reassigned to catch up with the leader") } else { //4. Wait until all replicas in RAR are in sync with the leader. // 重分配时原来就有的副本 valoldReplicas= controllerContext.partitionReplicaAssignment(topicPartition).toSet -- reassignedReplicas.toSet //5. replicas in RAR -> OnlineReplica // reassignedReplicas副本转为OnlineReplica状态,因为它们都在ISR中 reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Seq(newPartitionAndReplica(topicPartition, replica)), OnlineReplica) } //6. Set AR to RAR in memory. //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and // a new AR (using RAR) and same isr to every broker in RAR moveReassignedPartitionLeaderIfRequired(topicPartition, reassignedPartitionContext) //8. replicas in OAR - RAR -> Offline (force those replicas out of isr) //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted) // 删除老副本(没有参与到reassign里的副本) stopOldReplicasOfReassignedPartition(topicPartition, reassignedPartitionContext, oldReplicas) //10. Update AR in ZK with RAR. // 更新缓存,并写回zk updateAssignedReplicasForPartition(topicPartition, reassignedReplicas) //11. Update the /admin/reassign_partitions path in ZK to remove this partition. // 删除/admin/reassign_partitions节点 removePartitionsFromReassignedPartitions(Set(topicPartition)) //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker // 更新到每一个broker sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicPartition)) // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed // 把本次reassign过程中的topic,看看有没有要删除的,进行删除 topicDeletionManager.resumeDeletionForTopics(Set(topicPartition.topic)) } }