TopicPartitiontp=null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available ClusterAndWaitTime clusterAndWaitTime; clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); longremainingWaitMs= Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Clustercluster= clusterAndWaitTime.cluster; }
RecordAccumulator.RecordAppendResultresult= accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock)throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). appendsInProgress.incrementAndGet(); ByteBufferbuffer=null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) thrownewKafkaException("Producer closed while send in progress"); RecordAppendResultappendResult= tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; }
// batch.size默认是16KB,但是即使超出也没事,通过Math.max函数取了二者最大值 // we don't have an in-progress record batch try to allocate a new batch bytemaxUsableMagic= apiVersions.maxUsableProduceMagic(); intsize= Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
// 这里在申请ByteBuffer缓存空间了 buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. // 我暂时没看懂意图,大概意思是在极端情况下,检查线程在获取到dequeue锁之后,producer又关闭 if (closed) thrownewKafkaException("Producer closed while send in progress");
RecordAppendResultappendResult= tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // 万一这个时候又有了可用的ProducerBatch呢,我们就不用新建了呀,唉~这就很舒服 // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; }
/** * Get the deque for the given topic-partition, creating it if necessary. */ private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) { Deque<ProducerBatch> d = this.batches.get(tp); if (d != null) return d; d = newArrayDeque<>(); Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d); if (previous == null) return d; else return previous; }
/** * Try to append to a ProducerBatch. * * If it is full, we return null and a new batch is created. We also close the batch for record appends to free up * resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written * and memory records built) in one of the following cases (whichever comes first): right before send, * if it is expired, or when the producer is closed. */ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) { ProducerBatchlast= deque.peekLast(); if (last != null) { FutureRecordMetadatafuture= last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future == null) last.closeForRecordAppends(); else returnnewRecordAppendResult(future, deque.size() > 1 || last.isFull(), false); } returnnull; }
ProducerBatch#tryAppend
注:一定注意和上面的同名方法的区分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { returnnull; } else { Longchecksum=this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); this.lastAppendTime = now; FutureRecordMetadatafuture=newFutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. thunks.add(newThunk(callback, future)); this.recordCount++; return future; } }
/** * Write the record to `out` and return its size. */ publicstaticintwriteTo(DataOutputStream out, int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers)throws IOException { intsizeInBytes= sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers); ByteUtils.writeVarint(sizeInBytes, out);
byteattributes=0; // there are no used record attributes at the moment out.write(attributes);
if (headers == null) thrownewIllegalArgumentException("Headers cannot be null");
ByteUtils.writeVarint(headers.length, out);
for (Header header : headers) { StringheaderKey= header.key(); if (headerKey == null) thrownewIllegalArgumentException("Invalid null header key found in headers");