前言
本文聊聊消费者如何获取上次提交的位移
OffsetFetch请求
在Kafka消费者-源码分析(上)一文的最后,Consumer在refreshCommittedOffsetsIfNeeded方法发起了该请求,目的是获取消费者上次提交的位移,作为下次拉取请求的fetchOffset参数
1 | public boolean refreshCommittedOffsetsIfNeeded(final long timeoutMs) { |
Consumer的初始是将该分区上次提交的位移保存到了TopicPartitionState的position变量,该类在前文我已经翻译了各个变量的意义
1 | TopicPartitionState { |
源码
请求入口在KafkaApis#handleOffsetFetchRequest方法中,kafka之前的版本是从zk中获取,这部分代码省略1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def handleOffsetFetchRequest(request: RequestChannel.Request) {
val header = request.header
val offsetFetchRequest = request.body[OffsetFetchRequest]
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala.partition(authorizeTopicDescribe)
// 主要的核心逻辑
val (error, authorizedPartitionData) = groupCoordinator.handleFetchOffsets(offsetFetchRequest.groupId,
Some(authorizedPartitions))
if (error != Errors.NONE)
offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
else {
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
}
}
可以看到核心入口是 groupCoordinator.handleFetchOffsets
1 | def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None): |
getOffsets的源码如下,简而言之就是在offsets中获取,它代表分区提交的记录缓存,类型为Map[TopicPartition, CommitRecordMetadataAndOffset]1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
val group = groupMetadataCache.get(groupId)
if (group == null) {
// 异常:INVALID_OFFSET ...
} else {
group.inLock {
if (group.is(Dead)) {
// 异常:INVALID_OFFSET ...
} else {
topicPartitionsOpt match {
case None =>
// Return offsets for all partitions owned by this consumer group. (this only applies to consumers
// that commit offsets to Kafka.)
// 返回所有分区的提交记录
group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
}
case Some(topicPartitions) =>
topicPartitions.map { topicPartition =>
// 并不是去__consumer-offset里面取,group会缓存上一次提交的offset(第一次LeaderAndIsr的时候加载的)
val partitionData = group.offset(topicPartition) match {
case None =>
// 异常:INVALID_OFFSET ...
case Some(offsetAndMetadata) =>
new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
}
topicPartition -> partitionData
}.toMap
}
}
}
}
}
总结
本文作为消费者第一次消费之前的一个准备动作,主要是为了获取上次消费的位置,GroupCoordinator从缓存的offsets Map中获取该消费者组对该分区上次提交的位移,Consumer在接收到响应后,保存到了TopicPartitionState的position变量中,作为下一次Fetch请求的fetchOffset参数