// 省略部分代码 def handleProduceRequest(request: RequestChannel.Request){ // 转换为具体的请求对象 val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
// the callback for sending a produce response // 嵌套方法,定义响应回调,可以先不看 def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]){ // ++表示集合合并 val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses var errorInResponse = false
// 先打个日志,不管 mergedResponseStatus.foreach { case (topicPartition, status) => if (status.error != Errors.NONE) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( request.header.correlationId, request.header.clientId, topicPartition, status.error.exceptionName)) } }
// 省略配额限流相关代码
// Send the response immediately. In case of throttling, the channel has already been muted. // ack=0表示发到broker就返回,不关心副本是否写入 if (produceRequest.acks == 0) { sendNoOpResponseExemptThrottle(request) } else { // ack为-1或1的响应 sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None) } }
// 只有__admin_client客户端才能写入内部topic,例如__consumer_offset val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected; // hence we clear its data here in order to let GC reclaim its memory since it is already appended to log // 如果需要被放入purgatory,清空引用让GC回收, 因为已经append到log了 produceRequest.clearPartitionRecords() }
// This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. privatevolatile Map<TopicPartition, MemoryRecords> partitionRecords; // 每个分区待处理的消息 privateboolean transactional = false; // 事务 privateboolean idempotent = false; // 幂等性 }
// 获取当前tp的leader Partition对象 val (partition, _) = getPartitionAndLeaderReplicaIfLocal(topicPartition) val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks)
def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0): LogAppendInfo = { // inReadLock是一个柯里化函数,第二个参数是一个函数,返回值是LogAppendInfo和HW是否增加的bool值 // 相当于给方法加了读锁 val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) { // leaderReplicaIfLocal表示本地broker中的leader副本 leaderReplicaIfLocal match { //如果存在的话 caseSome(leaderReplica)=> // 获取Replica中的Log对象 val log = leaderReplica.log.get // min.insync.replicas参数 val minIsr = log.config.minInSyncReplicas // Set[Replica] ISR大小 val inSyncSize = inSyncReplicas.size
// Avoid writing to leader if there are not enough insync replicas to make it safe // 如果isr的个数没有满足min.insync.replicas就报错,需要知道的是min.insync.replicas是和ack=-1一起使用的 if (inSyncSize < minIsr && requiredAcks == -1) { thrownew NotEnoughReplicasException("Number of insync replicas for partition %s is [%d], below required minimum [%d]" .format(topicPartition, inSyncSize, minIsr)) }
// 真正的消息追加交给Log对象 val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
// 写入完消息,尝试触发Fetch请求,比如满足消费者的fetch.max.bytes replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId)) // we may need to increment high watermark since ISR could be down to 1 (info, maybeIncrementLeaderHW(leaderReplica)) } }
// some delayed operations may be unblocked after HW changed if (leaderHWIncremented) tryCompleteDelayedRequests()
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)
// return if we have no valid messages or if this is a duplicate of the last appended entry if (appendInfo.shallowCount == 0) return appendInfo
// trim any invalid bytes or partial messages before appending it to the on-disk log var validRecords = trimInvalidBytes(records, appendInfo)
lock synchronized { // assignOffsets写死为true,就不看else了 if (assignOffsets) { // assign offsets to the message set val offset = new LongRef(nextOffsetMetadata.messageOffset) // firstOffset又重新赋值了 appendInfo.firstOffset = Some(offset.value) val now = time.milliseconds // 各种验证 val validateAndOffsetAssignResult = LogValidator.validateMessagesAndAssignOffsets(validRecords, offset, time, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.recordVersion.value, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, isFromClient)
if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch <- validRecords.batches.asScala) { // 每一批消息不能比max.message.bytes大 if (batch.sizeInBytes > config.maxMessageSize) { thrownew RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" + s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.") } } } }
// update the epoch cache with the epoch stamped onto the message by the leader validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) _leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset) }
// check messages set size may be exceed config.segmentSize // MemoryRecords总消息不能比segment.bytes大 if (validRecords.sizeInBytes > config.segmentSize) { thrownew RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " + s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.") }
// maybe roll the log if this segment is full // 是否需要生成一个新的segment,具体判断条件见下文 val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L
// MemoryRecords info(s"MemoryRecords is ${records}")
for (batch <- records.batches.asScala) { // we only validate V2 and higher to avoid potential compatibility issues with older clients if (batch.magic >= RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0) thrownew InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}")
// update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower // case, validation will be more lenient. // Also indicate whether we have the accurate first offset or not // readFirstMessage就是想取第一批消息的数据 if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) lastOffsetOfFirstBatch = batch.lastOffset readFirstMessage = true }
// check that offsets are monotonically increasing // offset是否单调递增 if (lastOffset >= batch.lastOffset) monotonic = false
// update the last offset seen lastOffset = batch.lastOffset
// Check if the message sizes are valid. val batchSize = batch.sizeInBytes if (batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) thrownew RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + s"which exceeds the maximum configured value of ${config.maxMessageSize}.") }
// check the validity of the message by checking CRC batch.ensureValid()