本文从消费者拉取消息开始分析消费流程,但kafka并不是单纯的在poll方法中拉取消息,鉴于消费者组的存在,以及Rebalance动作,使整个消费流程的复杂度直线上升,因此需要比生产者花费更多的章节去讲解
准备
为了方便大家阅读源码,这里先对源码中经常出现的部分做一个解释,提示大家的阅读效率
名词解释
elapsedTime:已用时间,在一个带有超时时间的方法中,该变量用于记录部分已完成操作的已用时间,比如超时时间60s,其中访问数据库操作用了10s,那么elapsedTime就是10s
发送请求的一般模式
consumer向broker发送请求的一般模式是:
- sendXxxRequest表示发生一个请求,通常返回一个RequestFuture
- RequestFuture有几个方法,isDone表示请求结束,即获取到了broker端的响应,相反的表示无响应;succeeded表示请求成功,failed表示失败;可以对future注册一个Listener,执行成功和失败的回调
- Listen通常是一个xxxResponseHandler,常见的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14RequestFuture future = client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new xxxResponseHandler());
if (future.succeeded()) {
// 成功
} else {
// 失败
if (是可重试异常)
continue;
else if (!future.isRetriable())
throw exception;
// 重试的back off
time.sleep(retryBackoffMs);
}
consumer订阅
consumer订阅topic有3中方式:指定topic集合,指定topic正则,手动指定分区。前2中称之为AutoAssigned,因为是coordinator自动分配给消费者的,这三种方式分别对应下面3个api1
2
3
4
5public void subscribe(Collection<String> topics);
public void subscribe(Pattern pattern);
public void assign(Collection<TopicPartition> partitions)
本文只讨论第一种,这也是我们开发中最常用的订阅方式
poll方法
首先说下2个参数:timeoutMs和includeMetadataInTimeout
- timeoutMs:整个poll调用的超时时间,第一次poll里面向broker发送了4个请求,该参数建议设置大于3s,
- includeMetadataInTimeout:针对上面的超时时间,是否应该包含获取元数据的时间(向broker请求)
1 | public ConsumerRecords<K, V> poll(final Duration timeout) { |
不用关注太多poll方法细节,仅关注超时时间内的循环,简单理解为3步:第一步为updateAssignmentMetadataIfNeeded,然后是pollForFetches,最后将拦截器处理后的消息返回给用户,大致的流程如下:
updateAssignmentMetadataIfNeeded
updateAssignmentMetadataIfNeeded方法十分复杂,逻辑也很长,我这里直接说它的逻辑,让读者心里有个底。该方法主要做了3件事:
- 初始化Coordinator,主要是节点信息(id,ip,port)
- 初次rebalance,consumer启动时进入消费者组
- 初始化每个分区的last consumed position,表示该消费者组上次消费到哪个位移了,Coordinator会缓存每个group最后消费的位移
- 如果第3步获取不到,则根据auto.offset.reset获取
其次它的返回值是一个boolean,它在以下情况返回false:
- coordinator unknown
- rebalance失败(长时间拿不到响应结果,发生不可重试的异常)
- 获取不到分区的last consumed position (fetch offset)
这里再科普一些知识点,Coordinator,即消费者组协调器,每一个broker启动时都初始化了一个GroupCoordinator对象,它负责消费者组的生命周期管理,以及消费者组,消费者组成员的元数据管理
而每个分区的last consumed position是指消费者每次poll,准确的说应该是发起fetch请求向broker拉取数据的时候,都要传递一个fetchOffset参数,表示从哪里开始拉消息
但也有一些特殊情况,比如消费者组过期被删除了,新消费者组第一次拉取时,此时coordinator没有该消费者组的信息,没法返回该消费者组上次消费的分区位移,那么auto.offset.reset就起作用了,coordinator会根据该配置返回相应的offset
1 | boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) { |
updateAssignmentMetadataIfNeeded分为2部分:coordinator.poll和updateFetchPositions,前者是rebalance的核心步骤,需要重点关注
coordinator#poll
该方法位于ConsumerCoordinator类中,虽然源码看上去也不少(已删除部分),但在消费者组已稳定(stable)的情况下,执行到下面这行代码就会返回了:1
pollHeartbeat(currentTime)
pollHeartbeat会尝试查看是否到了心跳时间,来发起心跳,同时还记录了一个lastPoll变量,它与maxPollIntervalMs参数息息相关,如果两次poll的间隔超出了maxPollIntervalMs,心跳线程会主动发起LeaveGroup请求,让consumer主动离开消费者组,触发一次rebalance,这也是大部分人看到的rebalance异常,因为业务逻辑处理的太慢,导致rebalance的原因
1 | public boolean poll(final long timeoutMs) { |
上面说的是消费者组已稳定的情况,那么在消费者启动时,相当于消费者组中新加入了一个成员,必然会触发一次rebalance,我称之为初始rebalance,此时consumer并不知道coordinator是哪台broker(coordinatorUnknown),就会发起一次FindCoordinator请求,来初始化AbstractCoordinator.coordinator,此处的源码分析在kafka消费者-获取Coordinator一文
在获取到Coordinator之后,进入下一个if,rejoinNeededOrPending方法初始化为true,接下里的ensureActiveGroup就是初始rebalance的核心步骤
开始rebalance
ensureActiveGroup源码如下:1
2
3
4
5
6
7
8
9
10
11
12
13boolean ensureActiveGroup(long timeoutMs, long startMs) {
// 前面已经获取到了Coordinator,这里确认一下
if (!ensureCoordinatorReady(timeoutMs)) {
return false;
}
startHeartbeatThreadIfNeeded(); // 启动心跳线程
// join开始时间,和剩余的超时时间
long joinStartMs = time.milliseconds();
long joinTimeoutMs = remainingTimeAtLeastZero(timeoutMs, joinStartMs - startMs);
return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
}
ensureActiveGroup会启动心跳线程,但并不会开始心跳,因为enabled参数默认为false,并利用线程的等待唤醒机制,让心跳线程在wait处等待1
2
3
4if (!enabled) {
AbstractCoordinator.this.wait();
continue;
}
rebalance核心逻辑都在joinGroupIfNeeded方法中
joinGroupIfNeeded
这里我们关注下onJoinPrepare,它会回调ConsumerRebalanceListener的onPartitionsRevoked方法,而之后就是典型的客户端发送请求模式,只需要关注initiateJoinGroup方法即可
1 | boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) { |
initiateJoinGroup
initiateJoinGroup中的sendJoinGroupRequest同样是客户端发送请求模式的一种,可以看到在rebalance成功后,做了以下3件事
- MemberState置为stable
- rejoinNeeded置为false,它是退出外层循环的标志位
- 启动心跳线程
而JoinGroupRequest的详细细节,请参考我的另外2篇文章kafka-rebalance之JoinGroup和kafka-rebalance之SyncGroup,里面完整的讲述了rebalance细节1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
// 先暂停了心跳线程,其实本来就还没启动
disableHeartbeatThread();
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group with generation {}", generation.generationId);
// 跟新2个很重要的
state = MemberState.STABLE; // 消费者Stable
rejoinNeeded = false; // 退出外层循环的标志位
// 前面停止的心跳线程也重新启动了
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
该方法结束后,方法会层层返回到updateAssignmentMetadataIfNeeded,此时coordinator.poll已结束,接下来是updateFetchPositions方法1
2
3
4
5
6
7
8
9
10boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
final long startMs = time.milliseconds();
// 返回false表示获取coordinator位置,初始化rebalance失败 (正则订阅暂不考虑)
if (!coordinator.poll(timeoutMs)) {
return false;
}
// 返回true,更新要fetch的Position
return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
}
预备知识
TopicPartitionState表示consumer在消费过程中的状态,它会在每一个拉取后更新,里面的参数都比较简单,不再细说1
2
3
4
5
6
7
8
9TopicPartitionState {
private Long position; // last consumed position
private Long highWatermark; // the high watermark from last fetch
private Long logStartOffset; // the log start offset
private Long lastStableOffset;
private boolean paused; // whether this partition has been paused by the user
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
private Long nextAllowedRetryTimeMs;
}
updateFetchPositions
这里首先判断了所有订阅的分区是否有last consumed position,它用于下一次消息拉取,consumer要从什么位置开始拉,初始化时为null,那么就会向coordinator发起OFFSET_FETCH请求,用于初始化TopicPartitionState的position,
但还有coordinator没有消费者组上次消费位置元数据的情况,比如消费者组过期,被管理员删除,第一次建立时,那么该如何初始化position呢?
答案是auto.offset.reset,根据重置offset策略,向分区leader所在的broker,注意不是coordinator,发送LIST_OFFSETS请求来初始化position,该请求的详细处理过程请参考Kafka消费者-ListOffsets请求
1 | private boolean updateFetchPositions(final long timeoutMs) { |
上半部分总结
本文主要从大家平时见到的poll方法开始分析,并在一开始就普及了源码中的难点,poll方法从流程图上看十分简单,主要分为:updateAssignmentMetadataIfNeeded,pollForFetches,返回消息给用户这三步,本文主要分析第一步就已花费了很多篇幅,由于内容过长,将一些核心逻辑放在单独的文章中分析:获取Coordinator, rebalance之JoinGroup, rebalance之SyncGroup, ListOffsets请求。
这些请求都是在consumer第一次拉取消息之前的准备工作,首先consumer要知道Coordinator的信息,并保证与之连接通畅。之后便开始了初次入组的rebalance,其中又可细分为入组,等待其他组员(非必需),选举leader consumer,然后leader consumer根据分区策略制定分配方案,所有组员再次发送SyncGroup请求,由Coordinator来返回leader consumer制定的分配方案。
在有了分配方案之后,并不能立即开始拉取消息,因为consumer不知道每一个分区从哪里开始拉取,就要通过OffsetFetch请求向Coordinator获取fetchOffset,在有了fetchOffset之后理应可以拉取了,但又有2个特殊情况:当前是新消费者组,或是消费者组过期了(相关参数为offsets.retention.minutes),此时Coordinator不知道consumer上一次消费到哪了,那么auto.offset.reset参数就起作用了,根据是它来获取最早或是最新的位移,到此,准备工作才算完成。