// 消费者组未到stable状态 if (state != MemberState.STABLE) { // the group is not stable (perhaps because we left the group or because the coordinator // kicked us out), so disable heartbeats and wait for the main thread to rejoin. disable(); continue; }
if (coordinatorUnknown()) { if (findCoordinatorFuture != null || lookupCoordinator().failed()) // the immediate future check ensures that we backoff properly in the case that no // brokers are available to connect to. // 重试 AbstractCoordinator.this.wait(retryBackoffMs); } elseif (heartbeat.sessionTimeoutExpired(now)) { // the session timeout has expired without seeing a successful heartbeat, so we should // probably make sure the coordinator is still healthy. // 标记Coordinator未知,也在告诉了其他操作 markCoordinatorUnknown(); } elseif (heartbeat.pollTimeoutExpired(now)) { // 两次poll间隔超过了maxPollIntervalMs // the poll timeout has expired, which means that the foreground thread has stalled // in between calls to poll(), so we explicitly leave the group. maybeLeaveGroup(); } elseif (!heartbeat.shouldHeartbeat(now)) { // timeToNextHeartbeat返回的时间还没到 // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected AbstractCoordinator.this.wait(retryBackoffMs); } else { // 记录心跳发送时间 heartbeat.sentHeartbeat(now); // 发送心跳 sendHeartbeatRequest().addListener(newRequestFutureListener<Void>() { @Override publicvoidonSuccess(Void value) { synchronized (AbstractCoordinator.this) { // 记录心跳接收时间 heartbeat.receiveHeartbeat(time.milliseconds()); } }
@Override publicvoidonFailure(RuntimeException e) { synchronized (AbstractCoordinator.this) { if (e instanceof RebalanceInProgressException) { // it is valid to continue heartbeating while the group is rebalancing. This // ensures that the coordinator keeps the member in the group for as long // as the duration of the rebalance timeout. If we stop sending heartbeats, // however, then the session timeout may expire before we can rejoin. // 在rebalance期间的心跳也算 heartbeat.receiveHeartbeat(time.milliseconds()); } else { heartbeat.failHeartbeat(); // 唤醒,找wait // wake up the thread if it's sleeping to reschedule the heartbeat AbstractCoordinator.this.notify(); } } } }); } } } } // 省略各种异常处理 }
@Override publicvoidonFailure(RuntimeException e) { synchronized (AbstractCoordinator.this) { if (e instanceof RebalanceInProgressException) { // it is valid to continue heartbeating while the group is rebalancing. This // ensures that the coordinator keeps the member in the group for as long // as the duration of the rebalance timeout. If we stop sending heartbeats, // however, then the session timeout may expire before we can rejoin. // 在rebalance期间的心跳也算 heartbeat.receiveHeartbeat(time.milliseconds()); } else { heartbeat.failHeartbeat(); // 唤醒,找wait // wake up the thread if it's sleeping to reschedule the heartbeat AbstractCoordinator.this.notify(); } } } });