private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse){ try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
private def doSyncGroup(group: GroupMetadata, generationId: Int, memberId: String, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback){ group.inLock { group.currentState match { case Empty | Dead => // 省略 ... case PreparingRebalance => // 省略 ... case CompletingRebalance => // 同样的暂存回调,在延迟任务完成时触发 group.get(memberId).awaitingSyncCallback = responseCallback
// 只处理leader consumer if (group.isLeader(memberId)) { // fill any missing members with an empty assignment val missing = group.allMembers -- groupAssignment.keySet val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// 持久化保存到__consumer_offset groupManager.storeGroup(group, assignment, (error: Errors) => { group.inLock { // another member may have joined the group while we were awaiting this callback, // so we must ensure we are still in the CompletingRebalance state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing if (group.is(CompletingRebalance) && generationId == group.generationId) { if (error != Errors.NONE) { resetAndPropagateAssignmentError(group, error) maybePrepareRebalance(group) } else { // 正常的逻辑 setAndPropagateAssignment(group, assignment) group.transitionTo(Stable) } } } }) }
case Stable => // if the group is stable, we just return the current assignment val memberMetadata = group.get(memberId) responseCallback(memberMetadata.assignment, Errors.NONE) completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) } } } }