前言
在AbstractCoordinator的initiateJoinGroup方法中,通过判断joinFuture为null,发起了JoinGroupRequest请求,本文主要讲解GroupCoordinator对该请求的处理。同样的,源码分为客户端发起请求时的参数,broker端的处理过程,以及consumer对响应的处理
发起请求
请求的发送代码在AbstractCoordinator的sendJoinGroupRequest方法在,方法比较简单,这里说点简单之外的事情
- 首先确保已知coordinator节点,才能向它发起请求
- generation.memberId初始化时为””
- protocolType=”consumer”
- rebalanceTimeoutMs就是max.poll.interval.ms,这个结论可以从KafkaConsumer初始化ConsumerCoordinator得到
- 4中的rebalanceTimeoutMs也是不最终客户端请求的超时时间,这里源码作者额外增加了5s
1 | // 省略部分代码 |
请求的格式的json形式如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17{
"groupId": "test-group",
"sessionTimeout": 30000,
"memberId": "",
"protocolType": "consumer",
"groupProtocols": [
{
"name": "range", // PartitionAssignor的name
"metadata": {
"version": 0,
"topic": "foo,bar", // 订阅的topic
"user_data": null // 通常为null
}
}
],
"rebalanceTimeout": 10000
}
GoupCoordinator处理请求
JoinGroupRequest由GoupCoordinator所在的broker处理,入口方法为handleJoinGroupRequest,下面的源码省略了认证相关,可以看出该方法做了2件事:定义响应回调,调用handleJoinGroup方法
1 | def handleJoinGroupRequest(request: RequestChannel.Request) { |
handleJoinGroup
handleJoinGroup的核心逻辑是校验和调用doJoinGroup,关于校验这里说2点
- groupId不能为null,也不能是””
- sessionTimeoutMs默认必须在6000-300000 即6s-5min之间,当然你也可以修改group.min.session.timeout.ms,group.max.session.timeout.ms来调整区间
1 | def handleJoinGroup(groupId: String, |
doJoinGroup
核心方法都在doJoinGroup方法中,此处省略了许多校验的代码,而group的状态此时为Empty,我们直接看该条件分支即可
此时memberId为空,也就是JoinGroupRequest.UNKNOWN_MEMBER_ID,因此这里仅调用addMemberAndRebalance方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43private def doJoinGroup(group: GroupMetadata,
memberId: String,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback) {
group.currentState match {
case Dead =>
responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
// 省略...
case CompletingRebalance =>
// 省略...
case Empty | Stable =>
if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
// if the member id is unknown, register the member to the group
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType,
protocols, group, responseCallback)
} else {
val member = group.get(memberId)
if (group.isLeader(memberId) || !member.matches(protocols)) {
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
responseCallback(JoinGroupResult(
members = Map.empty,
memberId = memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
}
}
if (group.is(PreparingRebalance))
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
addMemberAndRebalance
addMemberAndRebalance首先初始化了memberId,可以看到是clientId拼接一个UUID,然后封装成了一个MemberMetadata对象,这是组成员的元信息对象,之后添加到GroupMetadata中
注意这里的回调函数传给了awaitingJoinCallback变量,rebalance的处理在maybePrepareRebalance中
1 | private def addMemberAndRebalance(rebalanceTimeoutMs: Int, |
add方法很简单,但要关注leaderId的赋值,它表示第一个consumer就是消费者组的leader,也就是第一个consumer为消费者组的leader member1
2
3
4
5
6
7
8
9
10
11
12def add(member: MemberMetadata) {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
assert(groupId == member.groupId)
assert(this.protocolType.orNull == member.protocolType)
assert(supportsProtocols(member.protocols))
if (leaderId.isEmpty)
leaderId = Some(member.memberId) // 来的第一个就是leader ...
members.put(member.memberId, member) // memberId为key MemberMetadata为value
}
maybePrepareRebalance
maybePrepareRebalance仅仅是做了一个判断:当前组状态是Stable, CompletingRebalance, Empty其中之一,才可以开始rebalance,满足条件就调用prepareRebalance
prepareRebalance方法在第一个consumer入组时创建一个InitialDelayedJoin,它会等待group.initial.rebalance.delay.ms
这个参数也是为消费者启动时的rebalance优化,因为每启动一个consumer都相当于加入一个组成员,需要进行一次rebalance,这无疑很浪费,这里等待一段时间再开始PreparingRebalance
之后的消费者创建的是DelayedJoin,到期时间就是rebalanceTimeoutMs,即max.poll.interval.ms
此时group的状态由Empty转换为了PreparingRebalance
1 | private def maybePrepareRebalance(group: GroupMetadata) { |
延迟join
joinPurgatory可以理解为一个延迟队列,那么直接看InitialDelayedJoin的onComplete方法,大概意思就是在group.initial.rebalance.delay.ms时间内,它会一直等待消费者入组,超时后后调用父类的onComplete,而InitialDelayedJoin的父类是DelayedJoin,它的onComplete会调用GroupCoordinator的onCompleteJoin方法
1 | private[group] class InitialDelayedJoin(coordinator: GroupCoordinator, |
延迟任务完成处理
GroupCoordinator的onCompleteJoin方法源码如下,它的大概意思是响应每一个consumer的请求,这里主要关注JoinGroupResult的第一个参数:leader member的元信息,是leader时才返回currentMemberMetadata(所有组成员的元信息),也就是说GroupCoordinator只告诉了leader consumer组成员的元信息,原因在下文会揭晓1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46def onCompleteJoin(group: GroupMetadata) {
group.inLock {
// remove any members who haven't joined the group yet
group.notYetRejoinedMembers.foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
// TODO: cut the socket connection to the client
}
if (!group.is(Dead)) {
group.initNextGeneration()
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
groupManager.storeGroup(group, Map.empty, error => {
if (error != Errors.NONE) {
warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
})
} else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
for (member <- group.allMemberMetadata) {
assert(member.awaitingJoinCallback != null)
val joinResult = JoinGroupResult(
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
Map.empty
},
memberId = member.memberId,
generationId = group.generationId,
subProtocol = group.protocolOrNull,
leaderId = group.leaderOrNull,
error = Errors.NONE)
member.awaitingJoinCallback(joinResult)
member.awaitingJoinCallback = null
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
}
}
这里还要关注一下initNextGeneration方法,generationId+1好理解,selectProtocol是因为每个consumer的分配策略可能不一样,selectProtocol用于投票选举一个PartitionAssignor
最后要关注的一点是组状态由PreparingRebalance转变为了CompletingRebalance,也就是所有消费者都进入组内了,等待GroupCoordinator分配分区
1 | def initNextGeneration() = { |
调用回调函数
上面调用的awaitingJoinCallback,其实就是sendResponseCallback,最终的响应返回值JoinGroupResponse如下1
2
3
4
5
6
7
8
9def sendResponseCallback(joinResult: JoinGroupResult) {
val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) }
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val responseBody = new JoinGroupResponse(requestThrottleMs, joinResult.error, joinResult.generationId,
joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava)
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
}
客户端Consumer处理响应
consumer端处理响应的原理:在最开始的sendJoinGroupRequest方法中,除了发送请求,还定义了响应的处理器,我们只关注Errors为NONE的情况,主要做了2件事
- 初始化了generation,它可以理解为rebalance的次数,版本号
- 根据server返回的leader memberId判断,如果当前consumer就是leader,调用onJoinLeader,否则调用onJoinFollower
1 | private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { |
onJoinLeader和onJoinFollower都是发送了一个SyncGroupRequest请求,唯一的区别是,onJoinLeader会计算分配方案,传给SyncGroupRequest请求,而onJoinFollower传入的是一个emptyMap
1 | private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { |
总结
过程分析
消费者发送JoinGroupRequest请求的主要作用是向GoupCoordinator上报的订阅信息,而GoupCoordinator处理的核心逻辑就是addMemberAndRebalance,具体是封装消费者组成员的信息为MemberMetadata,将其添加到GroupMetadata中,
并在此时确定第一个消费者为leader,之后的操作可以理解为将rebalance操作封装成一个DelayedJoin任务,放入延迟队列中,此时消费者组状态由Empty转变为PreparingRebalance,在延迟任务完成时,才返回给客户响应,响应的主要内容主要是leader member的元信息,消费者组的元信息。
客户端(consumer)接收到响应后,主要查看broker返回的leader memberId是不是就是自己,如果是,调用onJoinLeader,它会按分区分配算法计算每个消费者的分区,并再次发送一个SyncGroupRequest请求;相反,如果自己不是leader member,调用的是onJoinFollower,虽然它也发送了SyncGroupRequest请求,但是它的分配方案是空的。
那么为什么要发送一个空的SyncGroupRequest呢? 这是因为GoupCoordinator只认可leader member的分配方案,其他consumer发送空的SyncGroupRequest只是为了让GoupCoordinator返回leader member的分配方案,即对非leader member的consumer来说,空的SyncGroupRequest不是重点,该请求的响应里包含的分区分配才是重点
难点分析
JoinGroupRequest最难理解的地方是对InitialDelayedJoin和DelayedJoin的理解,InitialDelayedJoin在早期的kafka源码并不存在,是后来考虑到项目启动时会触发多次rebalance,因此在kafka的server.properties配置文件中最后一行配置:group.initial.rebalance.delay.ms,设置一定的延迟时间是有意义的,而在改时间超时后,会触发父类DelayedJoin的onComplete,它会调用GroupCoordinator的onCompleteJoin,这里面才会返回给所有consumer JoinGroupRequest请求的响应