前言
在AbstractCoordinator的initiateJoinGroup方法中,通过判断joinFuture为null,发起了JoinGroupRequest请求,本文主要讲解GroupCoordinator对该请求的处理。同样的,源码分为客户端发起请求时的参数,broker端的处理过程,以及consumer对响应的处理
发起请求
请求的发送代码在AbstractCoordinator的sendJoinGroupRequest方法在,方法比较简单,这里说点简单之外的事情
首先确保已知coordinator节点,才能向它发起请求
generation.memberId初始化时为""
protocolType=“consumer”
rebalanceTimeoutMs就是max.poll.interval.ms ,这个结论可以从KafkaConsumer初始化ConsumerCoordinator得到
4中的rebalanceTimeoutMs也是不最终客户端请求的超时时间,这里源码作者额外增加了5s
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private synchronized RequestFuture<ByteBuffer> initiateJoinGroup () { if (joinFuture == null ) { joinFuture = sendJoinGroupRequest(); } return joinFuture; } RequestFuture<ByteBuffer> sendJoinGroupRequest () { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest .Builder( groupId, this .sessionTimeoutMs, this .generation.memberId, protocolType(), metadata()).setRebalanceTimeout(this .rebalanceTimeoutMs); log.debug("Sending JoinGroup ({}) to coordinator {}" , requestBuilder, this .coordinator); int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000 ); return client.send(coordinator, requestBuilder, joinGroupTimeoutMs) .compose(new JoinGroupResponseHandler ()); }
请求的格式的json形式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { "groupId" : "test-group" , "sessionTimeout" : 30000 , "memberId" : "" , "protocolType" : "consumer" , "groupProtocols" : [ { "name" : "range" , "metadata" : { "version" : 0 , "topic" : "foo,bar" , "user_data" : null } } ] , "rebalanceTimeout" : 10000 }
GoupCoordinator处理请求
JoinGroupRequest由GoupCoordinator所在的broker处理,入口方法为handleJoinGroupRequest,下面的源码省略了认证相关,可以看出该方法做了2件事:定义响应回调,调用handleJoinGroup方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def handleJoinGroupRequest (request: RequestChannel.Request) { val joinGroupRequest = request.body[JoinGroupRequest] def sendResponseCallback (joinResult: JoinGroupResult) { val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) } def createResponse (requestThrottleMs: Int) : AbstractResponse = { val responseBody = new JoinGroupResponse (requestThrottleMs, joinResult.error, joinResult.generationId, joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava) trace("Sending join group response %s for correlation id %d to client %s." .format(responseBody, request.header.correlationId, request.header.clientId)) responseBody } sendResponseMaybeThrottle(request, createResponse) } val protocols = joinGroupRequest.groupProtocols().asScala.map(protocol => (protocol.name, Utils.toArray(protocol.metadata))).toList groupCoordinator.handleJoinGroup( joinGroupRequest.groupId, joinGroupRequest.memberId, request.header.clientId, request.session.clientAddress.toString, joinGroupRequest.rebalanceTimeout, joinGroupRequest.sessionTimeout, joinGroupRequest.protocolType, protocols, sendResponseCallback) }
handleJoinGroup
handleJoinGroup的核心逻辑是校验和调用doJoinGroup,关于校验这里说2点
groupId不能为null,也不能是""
sessionTimeoutMs默认必须在6000-300000 即6s-5min之间,当然你也可以修改group.min.session.timeout.ms ,group.max.session.timeout.ms来调整区间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def handleJoinGroup (groupId: String, memberId: String, clientId: String, clientHost: String, rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte]) ], responseCallback: JoinCallback): Unit = { validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error => responseCallback(joinError(memberId, error)) return } if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs || sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) { responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT)) } else { groupManager.getGroup(groupId) match { case None = > if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) } else { val group = groupManager.addGroup(new GroupMetadata (groupId, initialState = Empty)) doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } case Some (group) => doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } } }
doJoinGroup
核心方法都在doJoinGroup方法中,此处省略了许多校验的代码,而group的状态此时为Empty,我们直接看该条件分支即可
此时memberId为空,也就是JoinGroupRequest.UNKNOWN_MEMBER_ID,因此这里仅调用addMemberAndRebalance方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private def doJoinGroup (group: GroupMetadata, memberId: String, clientId: String, clientHost: String, rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte]) ], responseCallback: JoinCallback) { group.currentState match { case Dead = > responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) case PreparingRebalance = > case CompletingRebalance = > case Empty | Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { val member = group.get(memberId) if (group.isLeader(memberId) || !member.matches(protocols)) { updateMemberAndRebalance(group, member, protocols, responseCallback) } else { responseCallback(JoinGroupResult( members = Map.empty, memberId = memberId, generationId = group.generationId, subProtocol = group.protocolOrNull, leaderId = group.leaderOrNull, error = Errors.NONE)) } } } if (group.is(PreparingRebalance)) joinPurgatory.checkAndComplete(GroupKey(group.groupId)) }
addMemberAndRebalance
addMemberAndRebalance首先初始化了memberId,可以看到是clientId拼接一个UUID,然后封装成了一个MemberMetadata对象,这是组成员的元信息对象,之后添加到GroupMetadata中
注意这里的回调函数传给了awaitingJoinCallback变量,rebalance的处理在maybePrepareRebalance中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private def addMemberAndRebalance (rebalanceTimeoutMs: Int, sessionTimeoutMs: Int, clientId: String, clientHost: String, protocolType: String, protocols: List[(String, Array[Byte]) ], group: GroupMetadata, callback: JoinCallback) = { val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata (memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols) member.awaitingJoinCallback = callback if (group.is(PreparingRebalance) && group.generationId == 0 ) group.newMemberAdded = true group.add(member) maybePrepareRebalance(group) member }
add方法很简单,但要关注leaderId的赋值,它表示第一个consumer就是消费者组的leader,也就是第一个consumer为消费者组的leader member
1 2 3 4 5 6 7 8 9 10 11 12 def add (member: MemberMetadata) { if (members.isEmpty) this .protocolType = Some(member.protocolType) assert (groupId == member.groupId) assert (this .protocolType.orNull == member.protocolType) assert (supportsProtocols(member.protocols)) if (leaderId.isEmpty) leaderId = Some(member.memberId) members.put(member.memberId, member) }
maybePrepareRebalance
maybePrepareRebalance仅仅是做了一个判断:当前组状态是Stable, CompletingRebalance, Empty其中之一,才可以开始rebalance,满足条件就调用prepareRebalance
prepareRebalance方法在第一个consumer入组时创建一个InitialDelayedJoin,它会等待group.initial.rebalance.delay.ms
这个参数也是为消费者启动时的rebalance优化,因为每启动一个consumer都相当于加入一个组成员,需要进行一次rebalance,这无疑很浪费,这里等待一段时间再开始PreparingRebalance
之后的消费者创建的是DelayedJoin,到期时间就是rebalanceTimeoutMs,即max.poll.interval.ms
此时group的状态由Empty转换为了PreparingRebalance
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 private def maybePrepareRebalance (group: GroupMetadata) { group.inLock { if (group.canRebalance) prepareRebalance(group) } } private def prepareRebalance (group: GroupMetadata) { if (group.is(CompletingRebalance)) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) val delayedRebalance = if (group.is(Empty)) new InitialDelayedJoin (this , joinPurgatory, group, groupConfig.groupInitialRebalanceDelayMs, groupConfig.groupInitialRebalanceDelayMs, max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0 )) else new DelayedJoin (this , group, group.rebalanceTimeoutMs) group.transitionTo(PreparingRebalance) val groupKey = GroupKey(group.groupId) joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey)) }
延迟join
joinPurgatory可以理解为一个延迟队列,那么直接看InitialDelayedJoin的onComplete方法,大概意思就是在group.initial.rebalance.delay.ms时间内,它会一直等待消费者入组,超时后后调用父类的onComplete,而InitialDelayedJoin的父类是DelayedJoin,它的onComplete会调用GroupCoordinator的onCompleteJoin方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 private [group] class InitialDelayedJoin (coordinator: GroupCoordinator, purgatory: DelayedOperationPurgatory[DelayedJoin], group: GroupMetadata, configuredRebalanceDelay: Int, delayMs: Int, remainingMs: Int) extends DelayedJoin (coordinator, group, delayMs) { override def tryComplete () : Boolean = false override def onComplete () : Unit = { group.inLock { if (group.newMemberAdded && remainingMs != 0 ) { group.newMemberAdded = false val delay = min(configuredRebalanceDelay, remainingMs) val remaining = max(remainingMs - delayMs, 0 ) purgatory.tryCompleteElseWatch(new InitialDelayedJoin (coordinator, purgatory, group, configuredRebalanceDelay, delay, remaining ), Seq(GroupKey(group.groupId))) } else super .onComplete() } } } private [group] class DelayedJoin (coordinator: GroupCoordinator, group: GroupMetadata, rebalanceTimeout: Long) extends DelayedOperation (rebalanceTimeout, Some(group.lock)) { override def tryComplete () : Boolean = coordinator.tryCompleteJoin(group, forceComplete _) override def onExpiration () = coordinator.onExpireJoin() override def onComplete () = coordinator.onCompleteJoin(group) }
延迟任务完成处理
GroupCoordinator的onCompleteJoin方法源码如下,它的大概意思是响应每一个consumer的请求,这里主要关注JoinGroupResult的第一个参数:leader member的元信息,是leader时才返回currentMemberMetadata(所有组成员的元信息),也就是说GroupCoordinator只告诉了leader consumer组成员的元信息,原因在下文会揭晓
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 def onCompleteJoin (group: GroupMetadata) { group.inLock { group.notYetRejoinedMembers.foreach { failedMember => removeHeartbeatForLeavingMember(group, failedMember) group.remove(failedMember.memberId) } if (!group.is(Dead)) { group.initNextGeneration() if (group.is(Empty)) { info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})" ) groupManager.storeGroup(group, Map.empty, error => { if (error != Errors.NONE) { warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}" ) } }) } else { info(s"Stabilized group ${group.groupId} generation ${group.generationId} " + s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})" ) for (member <- group.allMemberMetadata) { assert (member.awaitingJoinCallback != null ) val joinResult = JoinGroupResult( members = if (group.isLeader(member.memberId)) { group.currentMemberMetadata } else { Map.empty }, memberId = member.memberId, generationId = group.generationId, subProtocol = group.protocolOrNull, leaderId = group.leaderOrNull, error = Errors.NONE) member.awaitingJoinCallback(joinResult) member.awaitingJoinCallback = null completeAndScheduleNextHeartbeatExpiration (group, member) } } } } }
这里还要关注一下initNextGeneration方法,generationId+1好理解,selectProtocol是因为每个consumer的分配策略可能不一样,selectProtocol用于投票选举一个PartitionAssignor
最后要关注的一点是组状态由PreparingRebalance转变为了CompletingRebalance,也就是所有消费者都进入组内了,等待GroupCoordinator分配分区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def initNextGeneration () = { assert (notYetRejoinedMembers == List.empty[MemberMetadata]) if (members.nonEmpty) { generationId += 1 protocol = Some(selectProtocol) transitionTo(CompletingRebalance) } else { generationId += 1 protocol = None transitionTo (Empty) } receivedConsumerOffsetCommits = false receivedTransactionalOffsetCommits = false }
调用回调函数
上面调用的awaitingJoinCallback,其实就是sendResponseCallback,最终的响应返回值JoinGroupResponse如下
1 2 3 4 5 6 7 8 9 def sendResponseCallback (joinResult: JoinGroupResult) { val members = joinResult.members map { case (memberId, metadataArray) => (memberId, ByteBuffer.wrap(metadataArray)) } def createResponse (requestThrottleMs: Int) : AbstractResponse = { val responseBody = new JoinGroupResponse (requestThrottleMs, joinResult.error, joinResult.generationId, joinResult.subProtocol, joinResult.memberId, joinResult.leaderId, members.asJava) responseBody } sendResponseMaybeThrottle(request, createResponse) }
客户端Consumer处理响应
consumer端处理响应的原理:在最开始的sendJoinGroupRequest方法中,除了发送请求,还定义了响应的处理器,我们只关注Errors为NONE的情况,主要做了2件事
初始化了generation,它可以理解为rebalance的次数,版本号
根据server返回的leader memberId判断,如果当前consumer就是leader,调用onJoinLeader,否则调用onJoinFollower
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private class JoinGroupResponseHandler extends CoordinatorResponseHandler <JoinGroupResponse, ByteBuffer> { @Override public void handle (JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = joinResponse.error(); if (error == Errors.NONE) { sensors.joinLatency.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this ) { if (state != MemberState.REBALANCING) { future.raise(new UnjoinedGroupException ()); } else { AbstractCoordinator.this .generation = new Generation (joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol()); if (joinResponse.isLeader()) { onJoinLeader(joinResponse).chain(future); } else { onJoinFollower().chain(future); } } } } else if (error == xxx) { } } }
onJoinLeader和onJoinFollower都是发送了一个SyncGroupRequest请求,唯一的区别是,onJoinLeader会计算分配方案,传给SyncGroupRequest请求,而onJoinFollower传入的是一个emptyMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private RequestFuture<ByteBuffer> onJoinLeader (JoinGroupResponse joinResponse) { try { Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members()); SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest .Builder(groupId, generation.generationId, generation.memberId, groupAssignment); log.debug("Sending leader SyncGroup to coordinator {}: {}" , this .coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); } catch (RuntimeException e) { return RequestFuture.failure(e); } } private RequestFuture<ByteBuffer> onJoinFollower () { SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest .Builder(groupId, generation.generationId, generation.memberId, Collections.<String, ByteBuffer>emptyMap()); log.debug("Sending follower SyncGroup to coordinator {}: {}" , this .coordinator, requestBuilder); return sendSyncGroupRequest(requestBuilder); }
总结
过程分析
消费者发送JoinGroupRequest请求的主要作用是向GoupCoordinator上报的订阅信息,而GoupCoordinator处理的核心逻辑就是addMemberAndRebalance,具体是封装消费者组成员的信息为MemberMetadata,将其添加到GroupMetadata中,
并在此时确定第一个消费者为leader,之后的操作可以理解为将rebalance操作封装成一个DelayedJoin任务,放入延迟队列中,此时消费者组状态由Empty转变为PreparingRebalance,在延迟任务完成时,才返回给客户响应,响应的主要内容主要是leader member的元信息,消费者组的元信息。
客户端(consumer)接收到响应后,主要查看broker返回的leader memberId是不是就是自己,如果是,调用onJoinLeader,它会按分区分配算法计算每个消费者的分区,并再次发送一个SyncGroupRequest请求;相反,如果自己不是leader member,调用的是onJoinFollower,虽然它也发送了SyncGroupRequest请求,但是它的分配方案是空的。
那么为什么要发送一个空的SyncGroupRequest呢? 这是因为GoupCoordinator只认可leader member的分配方案,其他consumer发送空的SyncGroupRequest只是为了让GoupCoordinator返回leader member的分配方案,即对非leader member的consumer来说,空的SyncGroupRequest不是重点,该请求的响应里包含的分区分配才是重点
难点分析
JoinGroupRequest最难理解的地方是对InitialDelayedJoin和DelayedJoin的理解,InitialDelayedJoin在早期的kafka源码并不存在,是后来考虑到项目启动时会触发多次rebalance,因此在kafka的server.properties配置文件中最后一行配置:group.initial.rebalance.delay.ms ,设置一定的延迟时间是有意义的,而在改时间超时后,会触发父类DelayedJoin的onComplete,它会调用GroupCoordinator的onCompleteJoin,这里面才会返回给所有consumer JoinGroupRequest请求的响应
流程图