在KafkaController初始化的过程中,多次遇见了LeaderAndIsr请求,这是broker之间通信的一个重要请求,它也是副本同步的关键步骤,本文主要分析KafkaApis对该请求的处理
ControllerChannelManager
在讲解LeaderAndIsr请求之前,我们先来看下ControllerChannelManager,在kafka-server端源码分析之Controller选举与初始化我曾提到过它,说它是broker之间通信的管理器,那么它是如何工作的呢?
又见内存队列
和ControllerEventManager一样,ControllerChannelManager也是用的异步内存队列来处理请求的发送,它只用于Controller节点和其它broker通信,它的大致原理如下:
- ControllerBrokerRequestBatch用3个Map分别维护了leaderAndIsrRequest,stopReplicaRequest,updateMetadataRequest三种请求的缓存
- 当KafkaController等组件想要发送请求时,仅仅是通过addXXXRequestForBrokers方法,将请求参数添加到缓存中,而在调用sendRequestsToBrokers方法后,它会遍历3中请求的缓存,将请求参数,回调函数等封装为QueueItem对象,放入一个类型为BlockingQueue[QueueItem]的messageQueue中
- 在RequestSendThread线程启动后,从messageQueue中取出请求对象,发送请求,响应后调用回调函数进行处理
请求流程如下
请求对象解析
添加LeaderAndIsr请求到缓存
虽然这个方法很简单,但我需要提2个关键点
- 第一个参数叫brokerIds,但是调用时传的是replicaIds或者Isr,这里要加强大家对副本id即brokerId的印象
- 注意最后面还添加了一个UpdateMetadata请求
1 | /** |
请求体的推理需要点篇幅,我这里直接贴出一个请求的样例json, 其中isNew只有在新分区,新副本请求时才为true,leaderEpoch在后续的副本同步会讲到,用于保证recovery时的数据一致性
liveLeaders表示的是上面partitionStates参数中每个分区leader所在的broker(存活的)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30{
"version":1,
"controllerId":1,
"controllerEpoch":1,
"partitionStates":[
{
"TopicPartition":"test-0",
"PartitionState":{
"isNew":false,
"basePartitionState":{
"controllerEpoch":1,
"leader":1,
"leaderEpoch":1,
"isr":[ 0,1,2],
"zkVersion":1,
"replicas":[ 0,1,2]
}
}
}
],
"liveLeaders":[
{
"id":1,
"idString":"1",
"host":"localhost",
"port":9092,
"rack":"rack-1"
}
]
}
打破思维定式的假设
这里我主要想分享一点我的经验,不要死心眼的认为broker有3个,比如现在的情况是
15台broker,有一个叫test的topic,它有12个分区,每个分区3个副本,以第一个分区test-0为例,它目前的leader是8,即第8台broker上的test-0分区的副本是leader, ISR列表为[8,10,14], 它的replica是[8,10,14],即所有副本都在同步列表
现在Controller是broker-0,LeaderAndISR请求需要变更一批分区的信息,其中刚好有一个要把test-0的leader变为10,因此它要向broker 8,10,14发送LeaderAndIsr请求,下面的请求讲解都以这个为例
topic分区 | 原leader副本 | 原ISR与Replica | 变更后的leader副本 | 变更后的ISR与Replica | 场景 |
---|---|---|---|---|---|
foo-1 | 9 | [5,9,10] | 5 | [5,9,10] | Preferred选举 |
test-0 | 8 | [8,10,14] | 10 | [8,10,14] | leader换选 |
bar-1 | 4 | [4,10,12] | 4 | [4,7,10,16,19] | 副本重分配 |
注:只有15台broker,最后一个有16,19不是我写错了
可以看到这一批LeaderAndIsr请求要发送到多个broker,leaderAndIsrRequestMap的类型是Map[brokerId, Map[TopicPartition, LeaderAndIsrRequest.PartitionState]],发送的代码如下
1 | leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) => |
可以看到kafka的本意就是积攒一批请求,然后按照brokerId分组,再发送出去,和生产者发送消息是同样的味道
再看上面的表格,变更后的ISR与Replica列表就是我们要发送的broker,我这里故意让三个分区都包含10,那么我们往broker-10发送的LeaderAndIsr请求同时包含3个分区的信息变更请求
KafkaApis处理LeaderAndIsr请求
LeaderAndIsr请求由handleLeaderAndIsrRequest方法处理,仅做了2件事:定义回调函数,认证预处理,关键的处理在调用的becomeLeaderOrFollower方法中
1 | def handleLeaderAndIsrRequest(request: RequestChannel.Request) { |
becomeLeaderOrFollower
该方法分为三段,前面部分只是做了下检查,中间部分是我们重点关注的
1 | def becomeLeaderOrFollower(correlationId: Int, |
topic分区 | 原leader副本 | 原ISR与Replica | 变更后的leader副本 | 变更后的ISR与Replica | 场景 |
---|---|---|---|---|---|
foo-1 | 9 | [5,9,10] | 5 | [5,9,10] | Preferred选举 |
test-0 | 8 | [8,10,14] | 10 | [8,10,14] | leader换选 |
bar-1 | 4 | [4,10,12] | 4 | [4,7,10,16,19] | 副本重分配 |
根据前面的假设,broker-0(controller)向broker-10发送了一批分区变更需求请求,假设当前处理请求的是broker-10,先进行分组
partitionsTobeLeader: 变更后的leader是当前broker,也就是broker-10的一组,即test-0分区,用makeLeaders方法处理
partitionsBecomeLeader: 其他的分区一组,即foo-1,bar-1,用makeFollowers方法处理
我们先看makeLeaders方法,它首先停止了这些副本的同步操作,然后遍历每个分区处理1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, LeaderAndIsrRequest.PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
// 返回结果
val partitionsToMakeLeaders = mutable.Set[Partition]()
// 从fetch线程中移除这些分区副本的同步操作
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
//遍历每一个分区,调用makeLeader
partitionState.foreach{ case (partition, partitionStateInfo) =>
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId)) {
partitionsToMakeLeaders += partition
}
}
那么我们直接来到Partition的makeLeader方法,它是处理单个分区信息变更的方法
makeLeader
看代码之前先稍微解释下leader epoch以及leader-epoch-checkpoint文件
leader epoch在分区的leader副本变更时更新,每次更新加1,相当于记录了分区leader的更新次数,也可以理解为leader的版本号
leader-epoch-checkpoint在每一个分区日志目录都有一个,这里以topic为test-1,分区为0的日志目录为例
它的内容是一个key value,key是leader epoch,value是上一代leader的LEO,我们知道LEO是即将写入的下一条消息的offset,这里也可以理解为新leader要写入的第一条消息
它里面的内容一般是这样的,其他check-point文件也是同理1
2
30
1
2 9832
第一行的0表示版本号,第二行表示记录个数,第三行才是真正的数据
言归正传,继续看makeLeader方法。该方法更新了本地的一些缓存,如controllerEpoch,inSyncReplicas,leaderEpoch,leaderEpochStartOffsetOpt(上面说的value),zkVersion。接着更新了check-point文件
最后是关于新的leader副本的处理,比如初始化它的HW
1 | def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { |
makeLeader的作用可以简单归纳为:
- 更新本地缓存数据
- 更新leader epoch到文件
- 如果本地副本不是leader,那么初始化它的HW,以及同步相关的参数
处理流程如下:
Follower副本处理
在becomeLeaderOrFollower方法中,makeLeaders处理完leader副本后,makeFollowers方法处理follower副本
该方法同样是遍历每一个分区
注:该方法源码很长,但是都是打印日志,这里删除了很多源码
1 | private def makeFollowers(controllerId: Int, |
上面调用的makeFollower和makeLeader方法类似1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val newAssignedReplicas = partitionStateInfo.basePartitionState.replicas.asScala.map(_.toInt)
val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader
val oldLeaderEpoch = leaderEpoch
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.basePartitionState.controllerEpoch
// add replicas that are new
newAssignedReplicas.foreach(r => getOrCreateReplica(r, partitionStateInfo.isNew))
// remove assigned replicas that have been removed by the controller
// 删除缓存里不要的副本了
(assignedReplicas.map(_.brokerId) -- newAssignedReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch
leaderEpochStartOffsetOpt = None
zkVersion = partitionStateInfo.basePartitionState.zkVersion
// leader是否更新了
if (leaderReplicaIdOpt.contains(newLeaderBrokerId) && (leaderEpoch == oldLeaderEpoch || leaderEpoch == oldLeaderEpoch + 1)) {
false
}
else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}
makeFollowers主要判断分区的leader副本是否发生了改变,如果改变了,就先移除原来的同步,重新向新leader同步
becomeLeaderOrFollower第三部分
becomeLeaderOrFollower在调用makeLeaders和makeFollowers之后,处理的源码如下
1 | leaderAndIsrRequest.partitionStates.asScala.keys.foreach(topicPartition => |
该部分主要是对future Replica的处理,它们会同步leader副本,之后清空空闲的fetcher线程,这里大家先理解一个fetcher线程管理了多个follower的同步
最后调用handleLeaderAndIsrRequest中的回调函数:onLeadershipChange,下面是该方法的源码
内部topic的特殊处理
kafka内部的topic有2个:__consumer_offsets
和__transaction_state
,它们的LeaderAndIsr处理比较复杂,这里不再展开细说。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
// for each new leader or follower, call coordinator to handle consumer group migration.
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == GROUP_METADATA_TOPIC_NAME)
groupCoordinator.handleGroupImmigration(partition.partitionId)
else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch)
}
updatedFollowers.foreach { partition =>
if (partition.topic == GROUP_METADATA_TOPIC_NAME)
groupCoordinator.handleGroupEmigration(partition.partitionId)
else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME)
txnCoordinator.handleTxnEmigration(partition.partitionId, partition.getLeaderEpoch)
}
}
总结
LeaderAndIsr请求是在分区leader或者副本集合发生变更时,Controller向其它broker发生的请求,broker在接收到请求后会看分区的新leader是否是当前broker的id
- 如果是,则先暂停该分区本地副本的同步,因为它们从follower变为leader了,然后更新元数据,记录leader epoch checkpoint等,最终初始化当前副本为leader副本
- 如果不是,则本地broker上的副本为follower副本,同样的更新本地缓存的元数据,此时按leader是否发生了改变分为2中情况
- leader改变了,那么移除当前同步线程对这些副本的同步,重新定位leader所在broker,以当前副本的HW为起始位移加入到副本同步线程中去
- leader没有变,什么都不做