发送fetch请求的对象有2类:client和follower,client拉取时有高水位线的限制,follower则没有,本文仅介绍client,
follower拉取时涉及到副本同步,以后单独分析
术语回顾
在kafka消息中有2个重要的术语:HW(HighWatermark),LEO(Log End Offset)
HW会在生产者发送的消息写入后,等待follower副本同步完成后更新, HW之前的消息称之为committed(已提交的消息),消费者只能消费HW之前的消息
而LEO则是在消息写入到本地leader副本后立即更新,它的值是最后一条消息的下一个位移,图中15被虚线标注,表示LEO处没有消息
源码分析
client端发送的FETCH请求
截止到2.0.1版本,Fetch请求已经是V8版本了,client端发送的FetchRequest在Fetcher#sendFetches方法中初始化
1 | final FetchSessionHandler.FetchRequestData data = entry.getValue(); |
该请求体略微复杂,首先关注下data.toSend方法,它返回的是一个Map<TopicPartition, PartitionData>,表示一个消费者可以消费多个topic的多个分区1
2
3
4
5
6
7
8public static final class PartitionData {
// 拉取的offset
public final long fetchOffset;
//
public final long logStartOffset;
// max.partition.fetch.bytes:每个分区拉取的最大值
public final int maxBytes;
}
data.metadata方法返回的FetchMetadata主要包含epoch和sessionId两个字段,data.toForget返回的是一个分区数组
相信大部分参数大家都是熟悉的,而epoch,sessionId和toForget是专门用于FetchSession的实现,它是1.1.0版本新增的功能,这里我只说它出现的背景
FetchSession背景
FetchSession出现是为了解决什么问题?
在kafka集群中的topic和partition达到一定规模后,会产生大量的Fetch请求,既包含消息拉取,也包含副本同步,而后者的请求量会很大。
假设100个topic,每个topic有3个分区,每个分区有3个副本,那么同时就有600个follower副本发送Fetch请求,再加上活动期间的业务量猛增的消费者的请求,Fetch请求的QPS将会很高
并且Fetch的请求体本身就很大,通常有几十KB,但是大部分参数都是不变的,比如订阅的分区,拉取参数等,因此可以将这些参数缓存在server端,client用一个session id来代替一次会话
这对Fetch请求的性能将是一个瓶颈,因此需要对请求体优化
其中大部分的参数大家都很熟悉,主要说2个不常见的参数:metadata和toForget
这两个参数是kafka 1.1.0版本之后新加的,用于FetchSession的实现,主要解决了在server端没有接收到消息时,消费者会空轮询,在topic分区较多时,FetchSession为Fetch请求体起到了瘦身的作用
想象一下每个client不止订阅一个topic,也会不止分配到一个TopicPartition,消费者在发送FETCH请求之前,要知道每个partition的leader副本在哪个broker上,然后按照broker分组,fetch请求体很大并不是空穴来风,kafka对此进行优化是很有必要的
以下是Fetch请求体格式,红框内的参数先不必关注,之后分析FetchSession相关内容
FETCH请求
FETCH请求同样也是在KafkaApis类中处理,此处省略部分代码,如FetchSession相关,关注拉取的核心流程
1 | def handleFetchRequest(request: RequestChannel.Request) { |
handleFetchRequest方法主要是过滤请求中可用的TopicPartition作为interesting参数,最后连带响应的回调函数一起传给replicaManager的fetchMessages方法,processResponseCallback响应回调最终再分析
ReplicaManager#fetchMessages
fetchMessages方法中主要调用了readFromLog->readFromLocalLog方法来读取消息,readFromLocalLog返回的是一个LogReadResult对象,如果当前是follower副本发送的用于同步的fetch请求,还会调用updateFollowerLogReadResults更新同步状态,这一部分内容在kafka server端源码分析之副本同步中做了详细阐述
由于Consumer拉取消息有一系列的参数控制,如fetch.max.wait.ms,fetch.min.bytes,fetch.max.wait.ms等,让本次fetch不能立即完成,需要新建一个DelayedOption对象,放入Purgatory中,等待后续操作触发本次请求的完成(complete)。Purgatory可以简单理解为一个延迟队列
1 | def fetchMessages(timeout: Long, |
接下来就从readFromLocalLog方法看看如何读取消息
readFromLocalLog方法
首先明确性该方法的入参和返回值,入参前文有详细注释,返回值则是一个(TopicPartition, LogReadResult)集合,前文也已提到
1 | /** |
readFromLocalLog主要是遍历请求中的分区,调用事先定义好的嵌套方法read,read方法会先找到leader副本,并且准备好读取的各种参数,最终调用Log对象的read方法
而外层的readFromLocalLog在拿到结果之后,会在循环中从fetchMaxBytes里减去已读取的消息大小
Log#read
我们知道Log只是个逻辑上的概念,本质是一个个Segment文件,每个Segment文件都有自己的起始位移(baseOffset),
fetch请求要从fetchOffset处开始读取消息,我们常规的做法是先找到要读取的Segment文件,kafka为了加快寻找速度,增加了索引文件的概念,找到后根据fetchMaxBytes参数(当前在循环中,会一直变化), 在高水位线的限制下调用Segment对象read方法读取消息,返回FetchDataInfo结果对象
以上就是该方法要做的事,Log对象的read方法源码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105/**
* Read messages from the log.
*
* @param startOffset 从哪里fetch,fetch请求中的fetchOffset参数:
* The offset to begin reading at
* @param maxLength fetch的maxBytes-已读取的消息大小:
* The maximum number of bytes to read
* @param maxOffset fetch的上限,即高水位线:
* The offset to read up to, exclusive. (i.e. this offset NOT included in the resulting message set)
* @param minOneMessage 是否至少fetch一条,即使的大小它已经超出了maxBytes:
* If this is true, the first message will be returned even if it exceeds `maxLength` (if one exists)
*/
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None, minOneMessage: Boolean = false,
isolationLevel: IsolationLevel): FetchDataInfo = {
maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")
// Because we don't use lock for reading, the synchronization is a little bit tricky.
// We create the local variables to avoid race conditions with updates to the log.
// 使用局部变量来避免并发锁竞争,nextOffsetMetadata.messageOffset就是LEO
val currentNextOffsetMetadata = nextOffsetMetadata
val next = currentNextOffsetMetadata.messageOffset
// 事务部分,先不关心
if (startOffset == next) {
val abortedTransactions =
if (isolationLevel == IsolationLevel.READ_COMMITTED) Some(List.empty[AbortedTransaction])
else None
return FetchDataInfo(currentNextOffsetMetadata, MemoryRecords.EMPTY, firstEntryIncomplete = false,
abortedTransactions = abortedTransactions)
}
// segments是一个跳表做的map,key为Segment的baseOffset,value是LogSegment对象
// floorEntry是干嘛的?看哪个LogSegment的baseOffset <= startOffset,其实就是在找要读取的LogSegment
// segmentEntry是一个entry: <baseOffset,LogSegment>
var segmentEntry = segments.floorEntry(startOffset)
// return error on attempt to read beyond the log end offset or read below log start offset
// 异常处理,大于LEO肯定不对,没找到合适的LogSegment也是不对的,至于startOffset < logStartOffset感觉很多余
if (startOffset > next || segmentEntry == null || startOffset < logStartOffset)
throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
s"but we only have log segments in the range $logStartOffset to $next.")
/**
* 从baseOffset小于指定offset的Segment里读取消息,但如果Segment里没有消息,
* 就继续往后面的Segment读,直到读取到了消息,或者到达了log的末尾
*/
// Do the read on the segment with a base offset less than the target offset
// but if that segment doesn't contain any messages with an offset greater than that
// continue to read from successive segments until we get some messages or we reach the end of the log
while (segmentEntry != null) {
// 取出LogSegment
val segment = segmentEntry.getValue
// 如果fetch读取了active Segment(最后一个正在写入的LogSegment),在LEO更新前,发生了两次fetch会产生并发竞争,
// 那么第二次fetch可能会发生OffsetOutOfRangeException,因此我们限制读取已暴露的位置(下面的maxPosition变量),而不是active Segment的LEO
// If the fetch occurs on the active segment, there might be a race condition where two fetch requests occur after
// the message is appended but before the nextOffsetMetadata is updated. In that case the second fetch may
// cause OffsetOutOfRangeException. To solve that, we cap the reading up to exposed position instead of the log
// end of the active segment.
/**
* maxPosition大概是说segmentEntry如果是最后一个(active Segment)就返回LEO,
* 否则返回当前Segment的大小
*/
val maxPosition = {
if (segmentEntry == segments.lastEntry) {
val exposedPos = nextOffsetMetadata.relativePositionInSegment.toLong
// 这个check again真的有用吗,为了解决bug?有点low
// Check the segment again in case a new segment has just rolled out.
if (segmentEntry != segments.lastEntry)
// New log segment has rolled out, we can read up to the file end.
segment.size
else
exposedPos
} else {
segment.size
}
}
/**
* 总结一下这几个入参
* startOffset:从哪个位置开始读
* maxOffset:读取的上限,高水位线
* maxLength:读取的maxBytes
* maxPosition:目前不知道什么用,LEO或者Segment的size
* minOneMessage:是否至少读第一条
*/
val fetchInfo = segment.read(startOffset, maxOffset, maxLength, maxPosition, minOneMessage)
if (fetchInfo == null) {
segmentEntry = segments.higherEntry(segmentEntry.getKey)
} else {
return isolationLevel match {
// 默认是READ_UNCOMMITTED,这里的fetchInfo作为返回值
case IsolationLevel.READ_UNCOMMITTED => fetchInfo
case IsolationLevel.READ_COMMITTED => addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
}
}
}
// 上面的while执行到最后一个Segment都还没return,说明我们要读取的消息都被删除了,这种情况返回空消息
// okay we are beyond the end of the last segment with no data fetched although the start offset is in range,
// this can happen when all messages with offset larger than start offsets have been deleted.
// In this case, we will return the empty set with log end offset metadata
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
}
LogSegment#read
LogSegment#read属于接近底层的方法了,上一小节已经根据一个<baseOffset, Segment>的map找到了相应的Segment,但是要知道默认一个Segment大小为1G,想要在这么大的文件中查询数据,必须依赖索引。
kafka的读取逻辑是先根据二分法找到相应的offset和position,最终通过FileRecords.slice读取区间内的消息1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47/**
* @param startOffset 从哪个位置开始读:
* A lower bound on the first offset to include in the message set we read
* @param maxOffset 读取的上限,高水位线:
* An optional maximum offset for the message set we read
* @param maxSize fetch的maxBytes-已读取的消息大小:
* The maximum number of bytes to include in the message set we read
* @param maxPosition 目前不知道什么用,LEO或者Segment的size:
* The maximum position in the log segment that should be exposed for read
* @param minOneMessage 是否至少读第一条:
* If this is true, the first message will be returned even if it exceeds `maxSize` (if one exists)
*
* @return The fetched data and the offset metadata of the first message whose offset is >= startOffset,
* or null if the startOffset is larger than the largest offset in this log
*/
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int, maxPosition: Long = size,
minOneMessage: Boolean = false): FetchDataInfo = {
// 日志文件字节数大小
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
// 从index文件里查找offset,position
val startOffsetAndSize = translateOffset(startOffset)
val startPosition = startOffsetAndSize.position
val offsetMetadata = new LogOffsetMetadata(startOffset, this.baseOffset, startPosition)
val adjustedMaxSize =
if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
else maxSize
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val fetchSize: Int = maxOffset match {
case Some(offset) =>
val mapping = translateOffset(offset, startPosition)
val endPosition =
if (mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(min(maxPosition, endPosition) - startPosition, adjustedMaxSize).toInt
}
// log.slice方法是在真正的获取消息
FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}
返回结果处理
经过层层返回,回到最初的的handleFetchRequest方法中,看看processResponseCallback方法中是如何对读取结果进行处理并返回给consumer的
省略配额限流相关代码…
该方法的入参是一个(TopicPartition, FetchPartitionData),表示每个分区对应的读取结果1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102// the callback for process a fetch response, invoked before throttling
def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
// FetchPartitionData转PartitionData
val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
responsePartitionData.foreach { case (tp, data) =>
val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
partitions.put(tp, new FetchResponse.PartitionData(data.error, data.highWatermark, lastStableOffset,
data.logStartOffset, abortedTransactions, data.records))
}
// 错误的分区也要返回各自的错误信息
erroneous.foreach { case (tp, data) => partitions.put(tp, data) }
// When this callback is triggered, the remote API call has completed.
// Record time before any byte-rate throttling.
request.apiRemoteCompleteTimeNanos = time.nanoseconds
var unconvertedFetchResponse: FetchResponse[Records] = null
// follower同步的fetch
if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
val responseSize = sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
quotas.leader.record(responseSize)
trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData().size()}, " +
s"metadata=${unconvertedFetchResponse.sessionId()}")
sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
} else {
// Fetch size used to determine throttle time is calculated before any down conversions.
// This may be slightly different from the actual response size. But since down conversions
// result in data being loaded into memory, we should do this only when we are not going to throttle.
//
// Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the
// quotas have been violated. If both quotas have been violated, use the max throttle time between the two
// quotas. When throttled, we unrecord the recorded bandwidth quota value
// 大部分都是限流先关代码,先忽略
val responseSize = fetchContext.getResponseSize(partitions, versionId)
val timeMs = time.milliseconds()
val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request)
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
if (maxThrottleTimeMs > 0) {
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendResponse)
} else {
quotas.request.throttle(request, requestThrottleTimeMs, sendResponse)
}
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
} else {
// Get the actual response. This will update the fetch context.
// 这是很关键的一行代码,创建了Response对象,全量和增量的方式有所不同,后续的FetchSession再说
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
}
// Send the response immediately.
// 发送响应到Processor的responseQueue中
sendResponse(request, Some(createResponse(maxThrottleTimeMs)), Some(updateConversionStats))
}
/**
* 很简单的一个方法,里面用maybeConvertFetchedData方法处理版本兼容引起的消息降级转换
* 然后统计了下bytes out的metric,最终返回FetchResponse
*/
def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
// Down-convert messages for each partition if required
val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]]
unconvertedFetchResponse.responseData().asScala.foreach { case (tp, unconvertedPartitionData) =>
if (unconvertedPartitionData.error != Errors.NONE)
debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
}
// Prepare fetch response from converted data
val response = new FetchResponse(unconvertedFetchResponse.error(), convertedData, throttleTimeMs,
unconvertedFetchResponse.sessionId())
response.responseData.asScala.foreach { case (topicPartition, data) =>
// record the bytes out metrics only when the response is being sent
brokerTopicStats.updateBytesOut(topicPartition.topic, fetchRequest.isFromFollower, data.records.sizeInBytes)
}
info(s"fetch response is ${response}")
response
}
def updateConversionStats(send: Send): Unit = {
send match {
case send: MultiRecordsSend if send.recordConversionStats != null =>
send.recordConversionStats.asScala.toMap.foreach {
case (tp, stats) => updateRecordConversionStats(request, tp, stats)
}
case _ =>
}
}
}
总结
fetch请求处理流程调用的对象基本和produce请求类似,需要注意的几点是:
- fetch请求分为consumer和follower,server端用一个replicaId字段判断,consumer为-1
- consumer读取有高水位线的限制,follower则没有
- consumer受限于各种参数,不会立即响应,需要放入purgatory延迟队列中等待完成
- 响应回调中遇到了限流,FetchSession,消息降级等过程
部分图片引用:
https://www.cnblogs.com/huxi2b/p/9335064.html
https://www.cnblogs.com/huxi2b/p/7453543.html