// get the list of partitions with data ready to send // 获取可发送的分区所在的broker节点集合 RecordAccumulator.ReadyCheckResultresult=this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update if (!result.unknownLeaderTopics.isEmpty()) { // The set of topics with unknown leader contains topics with leader election pending as well as // topics which may have expired. Add the topic again to metadata to ensure it is included // and request metadata update, since there are messages to send to the topic. // 有分区leader未知的topic,等待下次元数据更新 for (String topic : result.unknownLeaderTopics) this.metadata.add(topic);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
this.metadata.requestUpdate(); }
// remove any nodes we aren't ready to send to // 移除网络通信异常的node Iterator<Node> iter = result.readyNodes.iterator(); longnotReadyTimeout= Long.MAX_VALUE; while (iter.hasNext()) { Nodenode= iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
// create produce requests // 按照broker id分组 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } }
// find the minimum magic version used when creating the record sets byteminUsedMagic= apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); }
for (ProducerBatch batch : batches) { TopicPartitiontp= batch.topicPartition; MemoryRecordsrecords= batch.records();