Kafka生产者源码浅析(二)
上篇文章中对Spring-kafka源码做了追踪,也对原生的KafkaProducer做了部分解析,对关键类事先说明,帮助读者理解源码,克服对源码的恐惧心理
doSend的方法很长,我们分部拆解1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// 省略部分代码,catch处理
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
byte[] serializedValue;
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
return result.future;
}
}
Part one
1 | TopicPartition tp = null; |
throwIfProducerClosed做的很简单,看看Sender线程是否活着
waitOnMetadata返回一个ClusterAndWaitTime对象,里面是broker集群的元信息和获取信息的耗时,这个耗时算在了max.block.ms中,它控制这send方法的最大执行时间
waitOnMetadata通过唤醒sender线程,依靠NetworkClient.poll()方法来更新元数据
Cluster 类信息如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public final class Cluster {
private final boolean isBootstrapConfigured;
// 所有的broker节点
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
// 内部topic,如_consumer_offset
private final Set<String> internalTopics;
// controller节点
private final Node controller;
// 每个分区对应的分区信息
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
// 每个topic所有分区的信息
private final Map<String, List<PartitionInfo>> partitionsByTopic;
// topic所有可用分区的信息
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
// 每个broker节点的所有分区
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
// 按照nodeId组成map
private final Map<Integer, Node> nodesById;
// 里面只有一个clusterId熟悉
private final ClusterResource clusterResource;
}
具体是如何初始化的,可以看一下Cluster构造函数的源码
上面出现的PartitionInfo, 这些信息想必大家已经很熟悉1
2
3
4
5
6
7
8
9
10
11
12
13
14public class PartitionInfo {
// 主题
private final String topic;
// 分区
private final int partition;
// leader分区所在broker
private final Node leader;
// 副本所在broker
private final Node[] replicas;
// ISR副本所在broker
private final Node[] inSyncReplicas;
// 离线副本所在broker
private final Node[] offlineReplicas;
}
Part two
这一部分比较简单,对key和value序列化
1 | byte[] serializedKey; |
分区
接下来关注kafka是如何根据key为消息计算分区的
1 | int partition = partition(record, serializedKey, serializedValue, cluster); |
第二行代码是将topic和分区包装成一个TopicPartition类,重点关注第一行代码
partition方法会尝试获取消息中的partition,如果用户指定了分区,此时就不用计算了,否则使用partitioner计算分区
1 | private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
DefaultPartitioner
partitioner.partition的具体实现在DefaultPartitioner#partition,其源码如下:
1 | public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { |
回顾前文,Cluster封装了broker的很多信息,其中就用一个Map封装了topic的partition信息
1 | Map<String, List<PartitionInfo>> partitionsByTopic |
此时要分区,首先要获取这个topic的PartitionInfo,第一行代码的作用就是这个,map.get(topic),很简单
接下分两种情况:用户指定了key,和未指定key,我们知道旧版本的kafka在用户未指定key的情况下会默认将消息分配到某一个分区,
但这样会造成数据倾斜,官方后来对此作了优化,采用轮询(round-robin)的方式,简单提一下这块的代码
随机分配
kafka会初始化一个很大的伪随机数放在AtomicInteger中:1
2
3
4
5
6
7
8
9
10
11private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
以topic为key保存在一个ConcurrentHashMap中,每次用完counter自增并返回,这就是nextValue方法的作用
接下来从Cluster中获取可用的分区信息,获取分区数,使用counter对其取模,然后从可用分区列表中获取一个分区,由于counter的自增,达到了轮询(round-robin)的效果。但如果没有可用的分区,则从所有分区中挑选(有种破罐子破摔的味道)
Utils.toPositive用于取绝对值,kafka选择了一个cheap way: 与运算
以上是对消息中没有key的情况下如何分配分区的分析,至于有key的情况就比较简单了:对key做murmur2 hash运算,然后对分区数取模
自定义分区策略
实现Partitioner接口即可,配置方式参考拦截器,二者同理,参数名称为: partitioner.class
Part three
1 | setReadOnly(record.headers()); |
先把不重要的说了,这几行代码的可读性很好,设置消息头只读,然后估算消息的总大小,确保不会超出max.request.size和buffer.memory的大小,获取消息的时间戳,用户指定的优先,最后构建一个InterceptorCallback回调对象,它会先指定拦截器的onAcknowledgement回调,然后执行用户指定的Callback#onCompletion
追加至缓存并发送
1 | RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs); |
首先思考下缓存区的数据结构是什么:每一个分区都有一个先进先出的队列,,而kafka真正使用的是一个双端队列
RecordAccumulator为topic的每一个分区都创建了一个ArrayDeque(thread unsafe),里面存放的元素是ProducerBatch,它就是待批量发送的消息。
kafka使用一个CopyOnWriteMap保存分区和队列的关系,即只有在修改该map时把内容Copy出去形成一个新的map,然后配合volatile改变引用,这也是COW机制的常见用法
1 | ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>(); |
该map的模型如下
append方法返回一个RecordAppendResult,它是消息在添加进内存缓冲区后的结果:Deque队列中是否有元素,是否有新的ProducerBatch创建,两个条件都可以去通知sender线程发送消息
这里的代码看似很多,其实并不难,我们还是逐步分析下
1 | public RecordAppendResult append(TopicPartition tp, |
下面截取部分调用的代码进行讲解
创建队列
1 | /** |
从ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
中获取该主题分区对应的队列,如果不为空说明已经有了,直接返回,否者创建一个新的ArrayDeque,并放到map中,方便下次使用,
至于putIfAbsent方法,就是map中之前没有这个key,插入并返回新value,已经有了,就返回之前的value,即Deque
RecordAccumulator#tryAppend方法源码:
其实文档写的很清楚了,就是把消息追加到最后一个ProducerBatch中,但要是队列中一个都没有呢? 很简单,直接返回null,在外层方法中会判断不为null在结束,否则会分配
吐槽下:一开始就看岔了,好几个tryAppend,如果是我,我会写成tryAppendInternal之类的方法名
RecordAppendResult构造方法的最后一个参数表示是否是新建的ProducerBatch,这里返回时也确实返回了false1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20/**
* Try to append to a ProducerBatch.
*
* If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
* resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
* and memory records built) in one of the following cases (whichever comes first): right before send,
* if it is expired, or when the producer is closed.
*/
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
ProducerBatch#tryAppend
注:一定注意和上面的同名方法的区分
1 | public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { |
具体写入过程
recordsBuilder.append的主要实现过程如下:
首先key,value都会包装成ByteBuffer,写入时都是先写长度,再写内容
key,value,headers都写入DataOutputStream appendStream流对象中,返回写入的长度
1 | /** |
append方法小结
append方法的具体实现过程还是很复杂的,这里说下笔者对这个过程的理解:
- 尝试获取该TopicPartition下的队列,如果没有则创建
- 获取队列的最后一个ProducerBatch元素,将消息添加至该ProducerBatch,该过程会对Deque加锁
- 如果队列里没有ProducerBatch,或是最后一个ProducerBatch已经满了,就需要新建一个ProducerBatch
- 分配一个ByteBuffer空间,该空间大小在batch.size和消息大小中取较大值
- 再重新尝试步骤2一次,万一这时候刚好又有了呢(这时候Deque已经释放锁了)
- 创建好ProducerBatch之后,继续尝试append,添加成功之后将future和callback放入一个Thunk对象中,并且添加到一个List
集合,这是因为一批消息需要发送之后才有回调,所以先把回调统一放入一个集合中 - 添加成功之后,返回future对象,将ProducerBatch添加至Deque队列,同时用一个集合IncompleteBatches持有住了ProducerBatch
- 清理buffer空间,封装RecordAppendResult结果:Deque队列大小,新建的ProducerBatch对象是否已满
总结
kafka发送消息的步骤大致如下:
- 获取broker上的元信息
- key, value的序列化
- 计算分区
- 添加到缓存区
- 获取该分区对应的队列
- 尝试添加
- 如果添加成功返回