// the callback for sending a produce response // 嵌套方法,定义响应回调,可以先不看 def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { // ++表示集合合并 valmergedResponseStatus= responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses varerrorInResponse=false
// 先打个日志,不管 mergedResponseStatus.foreach { case (topicPartition, status) => if (status.error != Errors.NONE) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( request.header.correlationId, request.header.clientId, topicPartition, status.error.exceptionName)) } }
// 省略配额限流相关代码
// Send the response immediately. In case of throttling, the channel has already been muted. // ack=0表示发到broker就返回,不关心副本是否写入 if (produceRequest.acks == 0) { sendNoOpResponseExemptThrottle(request) } else { // ack为-1或1的响应 sendResponse(request, Some(newProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None) } }
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log // 如果需要被放入purgatory,清空引用让GC回收, 因为已经append到log了 produceRequest.clearPartitionRecords() }
// This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. privatevolatile Map<TopicPartition, MemoryRecords> partitionRecords; // 每个分区待处理的消息 privatebooleantransactional=false; // 事务 privatebooleanidempotent=false; // 幂等性 }
// Avoid writing to leader if there are not enough insync replicas to make it safe // 如果isr的个数没有满足min.insync.replicas就报错,需要知道的是min.insync.replicas是和ack=-1一起使用的 if (inSyncSize < minIsr && requiredAcks == -1) { thrownewNotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]" .format(topicPartition, inSyncSize, minIsr)) }
// 写入完消息,尝试触发Fetch请求,比如满足消费者的fetch.max.bytes replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderReplica)) } }
// some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests()
if(validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { // 每一批消息不能比max.message.bytes大 if (batch.sizeInBytes > config.maxMessageSize) { thrownewRecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" + s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.") } } } }
// update the epoch cache with the epoch stamped onto the message by the leader validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) _leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) }
// check messages set size may be exceed config.segmentSize // MemoryRecords总消息不能比segment.bytes大 if (validRecords.sizeInBytes > config.segmentSize) { thrownewRecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " + s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.") }
// maybe roll the log if this segment is full // 是否需要生成一个新的segment,具体判断条件见下文 valsegment= maybeRoll(validRecords.sizeInBytes, appendInfo)
// MemoryRecords info(s"MemoryRecords is ${records}")
for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0) thrownewInvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}")
// update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower // case, validation will be more lenient. // Also indicate whether we have the accurate first offset or not // readFirstMessage就是想取第一批消息的数据 if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) lastOffsetOfFirstBatch = batch.lastOffset readFirstMessage=true }
// check that offsets are monotonically increasing // offset是否单调递增 if (lastOffset >= batch.lastOffset) monotonic = false
// update the last offset seen lastOffset = batch.lastOffset
// Check if the message sizes are valid. valbatchSize= batch.sizeInBytes if(batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) thrownewRecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + s"which exceeds the maximum configured value of ${config.maxMessageSize}.") }
// check the validity of the message by checking CRC batch.ensureValid()