// construct the fetch results from the read results valfetchPartitionStatus= logReadResults.map { case (topicPartition, result) => // collectFirst:根据function find first element // fetchInfos是请求参数(TopicPartition, PartitionData)集合, // 意思就是从读取结果logReadResults里的TopicPartition和fetchInfos里的TopicPartition匹配 // 找出该TopicPartition的PartitionData请求参数 valfetchInfo= fetchInfos.collectFirst { case (tp, v) if tp == topicPartition => v }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation // 以分区为delay的watchKey valdelayedFetchKeys= fetchPartitionStatus.map { case (tp, _) => newTopicPartitionOperationKey(tp) }
// try to complete the request immediately, otherwise put it into the purgatory; // this is because while the delayed fetch operation is being created, new requests // may arrive and hence make this operation completable. // 先尝试一次,不行就放入Purgatory中 delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }
// decide whether to only fetch from leader vallocalReplica=if (fetchOnlyFromLeader) //先找leader副本 getLeaderReplicaIfLocal(tp) else getReplicaOrException(tp)
// decide whether to only fetch committed data (i.e. messages below high watermark) valmaxOffsetOpt=if (readOnlyCommitted) // 没开启事务时lastStableOffset应该为None // 这里返回的还是initialHighWatermark Some(lastStableOffset.getOrElse(initialHighWatermark)) else None valinitialLogEndOffset= localReplica.logEndOffset.messageOffset // LEO // 这应该是副本目前所有Segment的初始位移(第一个Segment的baseOffset),会随着日志清理改变 valinitialLogStartOffset= localReplica.logStartOffset valfetchTimeMs= time.milliseconds // 当前时间 vallogReadInfo= localReplica.log match { caseSome(log) => // TODO 目前还没搞清楚PartitionData里的maxBytes和FetchRequest的maxBytes什么区别 // limitBytes是请求参数中的maxBytes valadjustedFetchSize= math.min(partitionFetchSize, limitBytes)
// Try the read first, this tells us whether we need all of adjustedFetchSize for this partition // 从Log对象中读取 valfetch= log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage, isolationLevel)
// If the partition is being throttled, simply return an empty set. // 超出配额(被限流)时返回一个空消息 if (shouldLeaderThrottle(quota, tp, replicaId)) FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make // progress in such cases and don't need to report a `RecordTooLargeException` elseif (!hardMaxBytesLimit && fetch.firstEntryIncomplete) FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY) else fetch // 返回正常的结果给logReadInfo变量
caseNone=> error(s"Leader for partition $tp does not have a local log") FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY) }
/** * Read messages from the log. * * @param startOffset 从哪里fetch,fetch请求中的fetchOffset参数: * The offset to begin reading at * @param maxLength fetch的maxBytes-已读取的消息大小: * The maximum number of bytes to read * @param maxOffset fetch的上限,即高水位线: * The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set) * @param minOneMessage 是否至少fetch一条,即使的大小它已经超出了maxBytes: * If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists) */ def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false, isolationLevel: IsolationLevel): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
// Because we don't use lock for reading, the synchronization is a little bit tricky. // We create the local variables to avoid race conditions with updates to the log. // 使用局部变量来避免并发锁竞争,nextOffsetMetadata.messageOffset就是LEO valcurrentNextOffsetMetadata= nextOffsetMetadata valnext= currentNextOffsetMetadata.messageOffset
// return error on attempt to read beyond the log end offset or read below log start offset // 异常处理,大于LEO肯定不对,没找到合适的LogSegment也是不对的,至于startOffset < logStartOffset感觉很多余 if (startOffset > next || segmentEntry == null || startOffset < logStartOffset) thrownewOffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " + s"but we only have log segments in the range $logStartOffset to $next.")
/** * 从baseOffset小于指定offset的Segment里读取消息,但如果Segment里没有消息, * 就继续往后面的Segment读,直到读取到了消息,或者到达了log的末尾 */ // Do the read on the segment with a base offset less than the target offset // but if that segment doesn't contain any messages with an offset greater than that // continue to read from successive segments until we get some messages or we reach the end of the log while (segmentEntry != null) { // 取出LogSegment valsegment= segmentEntry.getValue
// If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after // the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may // cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log // end of the active segment. /** * maxPosition大概是说segmentEntry如果是最后一个(active Segment)就返回LEO, * 否则返回当前Segment的大小 */ valmaxPosition= { if (segmentEntry == segments.lastEntry) { valexposedPos= nextOffsetMetadata.relativePositionInSegment.toLong // 这个check again真的有用吗,为了解决bug?有点low // Check the segment again in case a new segment has just rolled out. if (segmentEntry != segments.lastEntry) // New log segment has rolled out, we can read up to the file end. segment.size else exposedPos } else { segment.size } } /** * 总结一下这几个入参 * startOffset:从哪个位置开始读 * maxOffset:读取的上限,高水位线 * maxLength:读取的maxBytes * maxPosition:目前不知道什么用,LEO或者Segment的size * minOneMessage:是否至少读第一条 */ valfetchInfo= segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage) if (fetchInfo == null) { segmentEntry = segments.higherEntry(segmentEntry.getKey) } else { return isolationLevel match { // 默认是READ_UNCOMMITTED,这里的fetchInfo作为返回值 case IsolationLevel.READ_UNCOMMITTED => fetchInfo case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo) } } }
// 上面的while执行到最后一个Segment都还没return,说明我们要读取的消息都被删除了,这种情况返回空消息 // okay we are beyond the end of the last segment with no data fetched although the start offset is in range, // this can happen when all messages with offset larger than start offsets have been deleted. // In this case, we will return the empty set with log end offset metadata FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } }
/** * @param startOffset 从哪个位置开始读: * A lower bound on the first offset to include in the message set we read * @param maxOffset 读取的上限,高水位线: * An optional maximum offset for the message set we read * @param maxSize fetch的maxBytes-已读取的消息大小: * The maximum number of bytes to include in the message set we read * @param maxPosition 目前不知道什么用,LEO或者Segment的size: * The maximum position in the log segment that should be exposed for read * @param minOneMessage 是否至少读第一条: * If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists) * * @return The fetched data and the offset metadata of the first message whose offset is >= startOffset, * or null if the startOffset is larger than the largest offset in this log */ @threadsafe def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size, minOneMessage: Boolean = false): FetchDataInfo = { // 日志文件字节数大小 vallogSize= log.sizeInBytes // this may change, need to save a consistent copy // 从index文件里查找offset,position valstartOffsetAndSize= translateOffset(startOffset)
valadjustedMaxSize= if (minOneMessage) math.max(maxSize, startOffsetAndSize.size) else maxSize
// calculate the length of the message set to read based on whether or not they gave us a maxOffset val fetchSize: Int = maxOffset match { caseSome(offset) => valmapping= translateOffset(offset, startPosition) valendPosition= if (mapping == null) logSize // the max offset is off the end of the log, use the end of the file else mapping.position min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt }
// When this callback is triggered, the remote API call has completed. // Record time before any byte-rate throttling. request.apiRemoteCompleteTimeNanos = time.nanoseconds
var unconvertedFetchResponse: FetchResponse[Records] = null // follower同步的fetch if (fetchRequest.isFromFollower) { // We've already evaluated against the quota and are good to go. Just need to record it now. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) valresponseSize= sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader) quotas.leader.record(responseSize) trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " + s"metadata=${unconvertedFetchResponse.sessionId()}") sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats)) } else { // Fetch size used to determine throttle time is calculated before any down conversions. // This may be slightly different from the actual response size. But since down conversions // result in data being loaded into memory, we should do this only when we are not going to throttle. // // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the // quotas have been violated. If both quotas have been violated, use the max throttle time between the two // quotas. When throttled, we unrecord the recorded bandwidth quota value
valmaxThrottleTimeMs= math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) if (maxThrottleTimeMs > 0) { // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value // from the fetch quota because we are going to return an empty response. quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs) if (bandwidthThrottleTimeMs > requestThrottleTimeMs) { quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendResponse) } else { quotas.request.throttle(request, requestThrottleTimeMs, sendResponse) } // If throttling is required, return an empty response. unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) } else { // Get the actual response. This will update the fetch context. // 这是很关键的一行代码,创建了Response对象,全量和增量的方式有所不同,后续的FetchSession再说 unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) }
/** * 很简单的一个方法,里面用maybeConvertFetchedData方法处理版本兼容引起的消息降级转换 * 然后统计了下bytes out的metric,最终返回FetchResponse */ def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = { // Down-convert messages for each partition if required valconvertedData=newutil.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]] unconvertedFetchResponse.responseData().asScala.foreach { case (tp, unconvertedPartitionData) => if (unconvertedPartitionData.error != Errors.NONE) debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " + s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}") convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData)) }
// Prepare fetch response from converted data valresponse=newFetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs, unconvertedFetchResponse.sessionId()) response.responseData.asScala.foreach { case (topicPartition, data) => // record the bytes out metrics only when the response is being sent brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes) } info(s"fetch response is ${response}") response }
def updateConversionStats(send: Send): Unit = { send match { case send: MultiRecordsSend if send.recordConversionStats != null => send.recordConversionStats.asScala.toMap.foreach { case (tp, stats) => updateRecordConversionStats(request, tp, stats) } case_=> } } }