private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) { try { // perform the leader synchronization and send back the assignment for the group Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
// 只处理leader consumer if (group.isLeader(memberId)) { // fill any missing members with an empty assignment valmissing= group.allMembers -- groupAssignment.keySet valassignment= groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// 持久化保存到__consumer_offset groupManager.storeGroup(group, assignment, (error: Errors) => { group.inLock { // another member may have joined the group while we were awaiting this callback, // so we must ensure we are still in the CompletingRebalance state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing if (group.is(CompletingRebalance) && generationId == group.generationId) { if (error != Errors.NONE) { resetAndPropagateAssignmentError(group, error) maybePrepareRebalance(group) } else { // 正常的逻辑 setAndPropagateAssignment(group, assignment) group.transitionTo(Stable) } } } }) }
caseStable=> // if the group is stable, we just return the current assignment valmemberMetadata= group.get(memberId) responseCallback(memberMetadata.assignment, Errors.NONE) completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId)) } } } }