public ConsumerRecords<K, V> poll(final Duration timeout) { return poll(timeout.toMillis(), true); } private ConsumerRecords<K, V> poll(finallong timeoutMs, finalboolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { if (timeoutMs < 0) thrownewIllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { thrownewIllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); }
// poll for new data until the timeout expires // 记录消耗的时间,防止超时 longelapsedTime=0L; do {
/** * 是否手动制定了TP,不用看else */ if (subscriptions.partitionsAutoAssigned()) { // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. // 查看距离下一次心跳时间是否为0,唤醒心跳线程,发送心跳 // 同时记录lastPoll,根据maxPollIntervalMs判断是否需要发起LeaveGroup请求(主动rebalance) /** * 如果不看下面coordinatorUnknown和rejoinNeededOrPending,正常步骤到这里就结束了 */ pollHeartbeat(currentTime);
// coordinator节点为null,或不可用 // 第一次poll时为null if (coordinatorUnknown()) {
if (rejoinNeededOrPending()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) { // 一般不用正则订阅,省略代码... }
// 直接看这,里面通过JoinGroup和SyncGroup进行rebalance,来保证达到STABLE状态 if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) { returnfalse; }
while (rejoinNeededOrPending()) { // 第一次为true if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) { returnfalse; } elapsedTime = time.milliseconds() - startTimeMs; if (needsJoinPrepare) { // 第一次为true,generation=Generation.NO_GENERATION // 主要是触发ConsumerRebalanceListener,如果自动提交为true,尝试提交 onJoinPrepare(generation.generationId, generation.memberId); needsJoinPrepare = false; }
// 第一次加入组 future是JoinGroup请求返回的分配方案 // initiateJoinGroup里面会把rejoinNeeded置为false,如果本次rebalance成功了,就会推出当前的while循环 final RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime)); // 无论请求成功还是失败,都还没拿到,说明超时了啊 if (!future.isDone()) { // we ran out of time returnfalse; }
// 请求成功 if (future.succeeded()) { // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried. ByteBuffermemberAssignment= future.value().duplicate();
// We reset the join group future only after the completion callback returns. This ensures // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. resetJoinGroupFuture(); // joinFuture重置为null needsJoinPrepare = true; } else { resetJoinGroupFuture(); finalRuntimeExceptionexception= future.exception();
state = MemberState.REBALANCING; joinFuture = sendJoinGroupRequest(); joinFuture.addListener(newRequestFutureListener<ByteBuffer>() { @Override publicvoidonSuccess(ByteBuffer value) { // handle join completion in the callback so that the callback will be invoked // even if the consumer is woken up before finishing the rebalance synchronized (AbstractCoordinator.this) { log.info("Successfully joined group with generation {}", generation.generationId); // 跟新2个很重要的 state = MemberState.STABLE; // 消费者Stable rejoinNeeded = false; // 退出外层循环的标志位
// 前面停止的心跳线程也重新启动了 if (heartbeatThread != null) heartbeatThread.enable(); } }
@Override publicvoidonFailure(RuntimeException e) { // we handle failures below after the request finishes. if the join completes // after having been woken up, the exception is ignored and we will rejoin synchronized (AbstractCoordinator.this) { state = MemberState.UNJOINED; } } }); } return joinFuture; }
TopicPartitionState { private Long position; // last consumed position private Long highWatermark; // the high watermark from last fetch private Long logStartOffset; // the log start offset private Long lastStableOffset; privateboolean paused; // whether this partition has been paused by the user private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting private Long nextAllowedRetryTimeMs; }