Kafka生产者源码浅析(一)
本文并没有直接使用原生的kafka-client,而是spring-kafka,版本为2.2.3.RELEASE。在当前以Spring-boot为首的潮流中,有必要学习Spring是如何集成kafka客户端的
send方法
以KafkaTemplate#send方法为入口,使用debug方式跟进源码
1 |
|
这里将消息封装为ProducerRecord对象,这是kafka-client原生对象,接下来进行发送操作
在doSend方法中,有很多事务相关,日志相关的代码,我们的目的是理清楚主流程,因此省略
1 | // 省略部分代码 ... |
可以看到首先通过getTheProducer获取生产者对象,那么Spring-kafka是如何创建该对象的呢?
构建生产者
代码只有一行,通过DefaultKafkaProducerFactory创建生产者1
2
3
4private Producer<K, V> getTheProducer() {
// 省略部分代码 ...
return this.producerFactory.createProducer();
}
进入到DefaultKafkaProducerFactory#createProducer
1 |
|
我们知道kafka生产者是单例并且线程安全的,这里spring使用double-check构建了一个CloseSafeProducer对象,而它被volatile修饰,经典的懒汉单例模式
平常我们使用的都是KafkaProducer,这个CloseSafeProducer又是什么呢?
该类实现了Producer接口,这也是KafkaProducer的父接口,细心的同学发现了CloseSafeProducer在创建是调用了createKafkaProducer方法,该方法源码如下
1 | protected Producer<K, V> createKafkaProducer() { |
坑爹呢这是,这个不还是KafkaProducer对象吗,那么传入一个KafkaProducer是要干吗,对装饰者模式和代理模式熟悉的同学已经明白是怎么回事,spring也确实这样做的:具体功能实现都委托给KafkaProducer对象实现,spring对记录事务id等日志信息做了增强
1 | protected static class CloseSafeProducer<K, V> implements Producer<K, V> { |
CloseSafeProducer的分析至此结束,在获取到包装后的KafkaProducer后,便是发送流程了
消息发送
回到doSend方法,发送的代码只有两行1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37// 省略部分代码...
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
final Producer<K, V> producer = getTheProducer();
final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>();
producer.send(producerRecord, buildCallback(producerRecord, producer, future));
if (this.autoFlush) {
flush();
}
return future;
}
// 省略部分代码...
private Callback buildCallback(final ProducerRecord<K, V> producerRecord, final Producer<K, V> producer,
final SettableListenableFuture<SendResult<K, V>> future) {
return (metadata, exception) -> {
try {
if (exception == null) {
future.set(new SendResult<>(producerRecord, metadata));
if (KafkaTemplate.this.producerListener != null) {
KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
}
}
else {
future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
if (KafkaTemplate.this.producerListener != null) {
// producerListener 默认是LoggingProducerListener,仅在错误是打印日志
KafkaTemplate.this.producerListener.onError(producerRecord, exception);
}
}
}
finally {
if (!KafkaTemplate.this.transactional) {
closeProducer(producer, false);
}
}
};
}
SettableListenableFuture是一个可设置,可监听的Future对象,用它构建异步发送消息后的Callback对象,大家可以认为Spring使用SettableListenableFuture对象对返回结果和异常进行了封装,Callback的作用在下文揭晓。
ListenableFuture同样的也可以添加回调函数,使用方法如下1
2
3
4
5
6ListenableFuture future = kafkaTemplate.send(record);
future.addCallback(result -> {
System.out.println(result);
}, error->{
System.out.println(error);
});
接着send方法由CloseSafeProducer委托给KafkaProducer执行,KafkaProducer的send方法如下
1 |
|
ProducerInterceptor通过for循环遍历依次执行1
2
3
4
5
6
7
8// 省略部分代码...
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
interceptRecord = interceptor.onSend(interceptRecord);
}
return interceptRecord;
}
send方法的官方文档翻译如下:1
2
3
4
5
6
7
8
9
10
11
12异步发送一条消息到一个topic,并且在应答之后立即调用已提供的回调
发送是异步的,一旦消息存储到了等待发送的缓冲区,该方法会立即返回。这样就不用阻塞在等待每一次发送消息的响应,允许并行的发送大量消息。
发送后的结果对象RecordMetadata具体说明了消息被发送到了哪个分区,被分配的位移和时间戳。
如果topic使用了TimestampType#CREATE_TIME,那么使用用户指定的时间,如果未指定,则使用发送时间。
如果使用了TimestampType#LOG_APPEND_TIME,则使用消息在broker端追加到日志的时间
send方法会为RecordMetadata对象返回一个Future对象,调用Future#get将会阻塞到请求完成,返回消息的元数据,或者返回在发送请求期间的任何异常
如果你想模拟一下,你可以send之后立即调用get
producer.send(record).get()
完全非阻塞的用法是用Callback参数来提供一个回调,它将在请求结束之后被调用
producer.send(myRecord, new Callback(){...})
Callback将在producer的I/O线程中触发,所以它必须轻量,快速,否则其他线程的消息会延迟发送。如果你在Callback中有耗时的逻辑处理,建议使用你自己的Executor,在Callback体中并发的执行
Spring同样支持同步和异步,将结果和异常都保存在了SettableListenableFuture中
这里再提一下Callback和Producerinterceptor的使用
Callback
这里提一下Callback类,这是一个函数式接口,仅有一个onCompletion方法1
public void onCompletion(RecordMetadata metadata, Exception exception);
两个参数分别为成功之后的消息元数据对象,和失败之后的异常对象,两者总是只有一个不为空(要么成功,要么失败),而Exception分为两类异常:可重试异常,不可重试异常
可重试
1 | CorruptRecordException |
不可重试1
2
3
4
5InvalidTopicException
OffsetMetadataTooLargeException
RecordBatchTooLargeException
RecordTooLargeException
UnknownServerException
可重试异常都继承自RetriableException,常见的判断方式如下:1
2
3
4
5if(e instanceof RetriableException){
...
} else {
...
}
拦截器
在发送消息之前,开发者都可以自定义拦截器,实现Producerinterceptor即可1
2
3
4
5// 每条消息发送之前调用
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
//发送请求应答之后调用
public void onAcknowledgement(RecordMetadata metadata, Exception exception);
public void close();
添加拦截器
kafka原生配置方式
1 | List<String> interceptors = new ArrayList<>(); |
在spring-kafka中配置更加简单1
spring.kafka.producer.properties.interceptor.classes=your class
消息发送(doSend)
经过拦截器拦截后,发送消息的流程又是如何呢
上图摘自胡夕老师的《Apache kafka实战》,十分形象的描绘了消息发送流程,正如上图所示,doSend方法只有有一个入参ProducerRecord,用于封装消息,一个出参RecordMetadata,它是broker应答之后的返回信息。二者的源码如下:
1 | // key长度,value长度可计算 |
这里提一下ProducerRecord的timestamp,取决于message.timestamp.type的配置
CreateTime:客户端发送消息时的时间戳,默认值。
LogAppendTime:消息在broker追加日志时的时间戳。
预备知识
为了便于理解接下来的流程,有几个类需要为大家介绍清楚
在KafkaProducer的构造函数中初始化了以下几个关键类,有兴趣的读者可自行研究,可省略JMX和事务相关的内容
Partitioner:分区选择器,你要发送的这条消息应该分配到哪个分区?
1
2this.partitioner =
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG,Partitioner.class);KafkaClient:用于和broker做网络交互的客户端
1
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(...);
Sender:用于批量发送消息的I/O线程,也称sender线程
1
this.sender = new Sender(...);
sender.wakeup()的作用是:消息达到了batch.size了,起来干活
KafkaThread:继承了Thread,构造函数可以传入线程名,和设置守护线程
1
2this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();RecordAccumulator:消息累加器,其实也就是常说的消息的内存缓冲区
在前期基本工作做好后,kafka便可以开始发送了,发送过程比较复杂,首先要获取broker端集群信息,broker到底是个什么情况,地址是什么,有几台服务器,里面已有的topic,topic已有的分区,分区在broker的分布,ISR列表,OLR列表等等信息,这些都是发送之前要关心的
- Metadata:这些元信息都封装在了Metadata类中,Metadata还负责这些元信息的缓存及刷新
Cluster: Metadata中持有一个Cluster对象,kafka每一个broker都保存了topic的leader副本分区信息,producer只需要随机向一个broker发送请求就可以获取获取到,同时该对象还有每一个kafka broker节点的元信息,如ip端口等
TopicPartition:将topic和计算好的分区封装到一起
- InterceptorCallback:如果没有拦截器,它就是Callback回调
总结
本来文章主要介绍了spring对kafka-client生产者做了哪些封装,spring使用ProducerFactory来创建KafkaProducer对象,将其传给CloseSafeProducer作为委托对象
ListenableFuture作为send方法的返回值,在buildCallback方法中对Kafka原生的Callback做了封装,并加入了spring自己的producerListener
本文至此结束,剥离spring的封装,接下来发送的具体源码将在下文揭晓