override def doWork() { maybeTruncate() // 构建fetch请求 valfetchRequest= inLock(partitionMapLock) { val ResultWithPartitions(fetchRequest, partitionsWithError) = buildFetchRequest(states) if (fetchRequest.isEmpty) { trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request") // replica.fetch.backoff.ms partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } handlePartitionsWithErrors(partitionsWithError) fetchRequest } if (!fetchRequest.isEmpty) processFetchRequest(fetchRequest) }
def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult): Boolean = { // 此处的replica就是远程的follower副本 valreplicaId= replica.brokerId // No need to calculate low watermark if there is no delayed DeleteRecordsRequest // LW就是所有副本logStartOffset的最小值 valoldLeaderLW=if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L // 更新同步信息 replica.updateLogReadResult(logReadResult) // 新的LW valnewLeaderLW=if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L // check if the LW of the partition has incremented // since the replica's logStartOffset may have incremented valleaderLWIncremented= newLeaderLW > oldLeaderLW // check if we need to expand ISR to include this replica // if it is not in the ISR yet // 扩充ISR列表 valleaderHWIncremented= maybeExpandIsr(replicaId, logReadResult)
valresult= leaderLWIncremented || leaderHWIncremented // some delayed operations may be unblocked after HW or LW changed if (result) // 尝试完成一些延迟操作:produce,fetch,deleteRecords tryCompleteDelayedRequests()
debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.") result }
def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult): Boolean = { inWriteLock(leaderIsrUpdateLock) { // check if this replica needs to be added to the ISR leaderReplicaIfLocal match { caseSome(leaderReplica) => valreplica= getReplica(replicaId).get valleaderHW= leaderReplica.highWatermark valfetchOffset= logReadResult.info.fetchOffsetMetadata.messageOffset
// 添加到集合 valnewInSyncReplicas= inSyncReplicas + replica // update ISR in ZK and cache // 新的Isr更新到zk的state节点,并更新到本地缓存isrChangeSet中 updateIsr(newInSyncReplicas) // metrics replicaManager.isrExpandRate.mark() } // 尝试增加leader的HW,因为有follower进入到ISR了 // check if the HW of the partition can now be incremented // since the replica may already be in the ISR and its LEO has just incremented maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs) caseNone=> false// nothing to do if no longer leader } } }
responseData.foreach { case (topicPartition, partitionData) => valtopic= topicPartition.topic valpartitionId= topicPartition.partition Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState => // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. // In this case, we only want to process the fetch response if the partition state is ready for fetch and the current offset is the same as the offset requested. if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.fetchOffset && currentPartitionFetchState.isReadyForFetch) { partitionData.error match { case Errors.NONE => try {
// 更新metric lag(FetcherLagStats),如果lag<=0,说明是inSync的; HW-lastOffset fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset) // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread // 参数解释:分区,拉取时的fetchOffset,拉取的结果数据 processPartitionData(topicPartition, currentPartitionFetchState.fetchOffset, partitionData)
valvalidBytes= records.validBytes // ReplicaDirAlterThread may have removed topicPartition from the partitionStates after processing the partition data if (validBytes > 0 && partitionStates.contains(topicPartition)) { // 更新分区的PartitionState(newOffset, 0, false) // Update partitionStates only if there is no exception during processPartitionData partitionStates.updateAndMoveToEnd(topicPartition, newPartitionFetchState(newOffset)) // metrics ... fetcherStats.byteRate.mark(validBytes) } } } }) } } } }
// for the follower replica, we do not need to keep // its segment base offset the physical position, // these values will be computed upon making the leader valleaderLogStartOffset= partitionData.logStartOffset replica.maybeIncrementLogStartOffset(leaderLogStartOffset)
// Traffic from both in-sync and out of sync replicas are accounted for in replication quota to ensure total replication // traffic doesn't exceed quota. if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes) replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes) }