前言
本文聊聊消费者如何获取上次提交的位移
OffsetFetch请求
在Kafka消费者-源码分析(上) 一文的最后,Consumer在refreshCommittedOffsetsIfNeeded方法发起了该请求,目的是获取消费者上次提交的位移,作为下次拉取请求的fetchOffset参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public boolean refreshCommittedOffsetsIfNeeded (final long timeoutMs) { final Set<TopicPartition> missingFetchPositions = subscriptions.missingFetchPositions(); final Map<TopicPartition, OffsetAndMetadata> offsets = fetchCommittedOffsets(missingFetchPositions, timeoutMs); if (offsets == null ) return false ; for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) { final TopicPartition tp = entry.getKey(); final long offset = entry.getValue().offset(); this .subscriptions.seek(tp, offset); } return true ; }
Consumer的初始是将该分区上次提交的位移保存到了TopicPartitionState的position变量,该类在前文我已经翻译了各个变量的意义
1 2 3 4 5 6 7 8 9 TopicPartitionState { private Long position; private Long highWatermark; private Long logStartOffset; private Long lastStableOffset; private boolean paused; private OffsetResetStrategy resetStrategy; private Long nextAllowedRetryTimeMs; }
源码
请求入口在KafkaApis#handleOffsetFetchRequest方法中,kafka之前的版本是从zk中获取,这部分代码省略
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def handleOffsetFetchRequest (request: RequestChannel.Request) { val header = request.header val offsetFetchRequest = request.body[OffsetFetchRequest] val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala.partition(authorizeTopicDescribe) val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId, Some(authorizedPartitions)) if (error != Errors.NONE) offsetFetchRequest.getErrorResponse(requestThrottleMs, error) else { val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap new OffsetFetchResponse (requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava) } }
可以看到核心入口是 groupCoordinator.handleFetchOffsets
1 2 3 4 5 6 7 8 9 10 def handleFetchOffsets (groupId: String, partitions: Option[Seq[TopicPartition]] = None) : (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = { validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match { case Some (error) => error -> Map.empty case None = > (Errors.NONE, groupManager.getOffsets(groupId, partitions)) } }
getOffsets的源码如下,简而言之就是在offsets中获取,它代表分区提交的记录缓存,类型为Map[TopicPartition, CommitRecordMetadataAndOffset]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 def getOffsets (groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]) : Map[TopicPartition, OffsetFetchResponse.PartitionData] = { val group = groupMetadataCache.get(groupId) if (group == null ) { } else { group.inLock { if (group.is(Dead)) { } else { topicPartitionsOpt match { case None = > group.allOffsets.map { case (topicPartition, offsetAndMetadata) => topicPartition -> new OffsetFetchResponse .PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE) } case Some (topicPartitions) => topicPartitions.map { topicPartition => val partitionData = group.offset(topicPartition) match { case None = > case Some (offsetAndMetadata) => new OffsetFetchResponse .PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE) } topicPartition -> partitionData }.toMap } } } } }
总结
本文作为消费者第一次消费之前的一个准备动作,主要是为了获取上次消费的位置,GroupCoordinator从缓存的offsets Map中获取该消费者组对该分区上次提交的位移,Consumer在接收到响应后,保存到了TopicPartitionState的position变量中,作为下一次Fetch请求的fetchOffset参数