这一篇比较简单,快速带大家过一下kafka如何连接ZooKeeper,以及kafka对节点事件监听的代码设计
ZooKeeper大致介绍
kafka主要利用ZooKeeper选举Controller,这里先大致介绍下ZooKeeper的基本用法,仅用于学习Kafka
ZNode
几乎所有的ZooKeeper教程都会告诉你ZooKeeper是一种类似文件系统目录结构的存储系统,但我不这么认为,文件系统中的目录本身无法存储数据,而ZooKeeper可以
ZooKeeper中的节点主要分为持久节点和临时节点,持久节点即使重启也会存在,因为它已经写入到磁盘文件了,而临时节点在ZooKeeper重启或是客户端会话超时后,就会消失
zkVersion
简单的把它理解为乐观锁的版本号即可
chroot
chroot的使用场景是一个zk集群管理了多套kafka集群,那么每个kafka集群需要一个根节点来区分
比如我们可以在kafka的sever.properties文件中这样配置: zookeeper.connect=localhost:2181/cluster_201
zkCli
在ZooKeeper的bin目录下,可以启动zkCli.sh脚本,通过"ls 节点名"的方式获取子节点,通过"get 节点名"的方式获取该节点存储的数据
如果你已经有ZooKeeper的可视化管理工具,如zkui,shepher,查看起来就更方便了
kafka选举Controller的原理
kafka是如何利用ZooKeeper的临时节点,来选举Controller的呢?
kafka集群的每个节点会在启动时创建/controller节点,如果该节点不存在,并且创建成功,那么该broker就成为Controller
其它broker创建时就会发现节点已存在,放弃成为Controller
初始化ZooKeeper
初始化ZooKeeper主要是建立连接,注册监听器,入口代码在KafkaServer的启动方法中(startup),该方法调用了initZkClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private def initZkClient (time: Time) : Unit = { def createZkClient (zkConnect: String, isSecure: Boolean) = KafkaZkClient(zkConnect, isSecure, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, config.zkMaxInFlightRequests, time) val chrootIndex = config.zkConnect.indexOf("/" ) val chrootOption = { if (chrootIndex > 0 ) Some(config.zkConnect.substring(chrootIndex)) else None } val secureAclsEnabled = config.zkEnableSecureAcls val isZkSecurityEnabled = JaasUtils.isZkSecurityEnabled() if (secureAclsEnabled && !isZkSecurityEnabled) throw new java .lang.SecurityException(s"${KafkaConfig.ZkEnableSecureAclsProp} is true, but the verification of the JAAS login file failed." ) chrootOption.foreach { chroot => val zkConnForChrootCreation = config.zkConnect.substring(0 , chrootIndex) val zkClient = createZkClient(zkConnForChrootCreation, secureAclsEnabled) zkClient.makeSurePersistentPathExists(chroot) info(s"Created zookeeper path $chroot" ) zkClient.close() } _zkClient = createZkClient(config.zkConnect, secureAclsEnabled) _zkClient.createTopLevelPaths() }
createZkClient初始化了KafkaZkClient对象,我们来看看它的apply初始化方法
1 2 3 4 5 6 7 8 9 10 11 12 13 def apply (connectString: String, isSecure: Boolean, sessionTimeoutMs: Int, connectionTimeoutMs: Int, maxInFlightRequests: Int, time: Time, metricGroup: String = "kafka.server" , metricType: String = "SessionExpireListener" ) = { val zooKeeperClient = new ZooKeeperClient (connectString, sessionTimeoutMs, connectionTimeoutMs, maxInFlightRequests, time, metricGroup, metricType) new KafkaZkClient (zooKeeperClient, isSecure, time)
KafkaZkClient最常用的2个方法是对zk请求的保证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 private def retryRequestUntilConnected[Req <: AsyncRequest](request: Req): Req#Response = { retryRequestsUntilConnected(Seq(request)).head } private def retryRequestsUntilConnected[Req <: AsyncRequest](requests: Seq[Req]): Seq[Req#Response] = { val remainingRequests = ArrayBuffer(requests: _*) val responses = new ArrayBuffer [Req#Response] while (remainingRequests.nonEmpty) { val batchResponses = zooKeeperClient.handleRequests(remainingRequests) batchResponses.foreach(response => latencyMetric.update(response.metadata.responseTimeMs)) if (batchResponses.exists(_.resultCode == Code.CONNECTIONLOSS)) { val requestResponsePairs = remainingRequests.zip(batchResponses) remainingRequests.clear() requestResponsePairs.foreach { case (request, response) => if (response.resultCode == Code.CONNECTIONLOSS) remainingRequests += request else responses += response } if (remainingRequests.nonEmpty) zooKeeperClient.waitUntilConnected() } else { remainingRequests.clear() responses ++= batchResponses } } responses }
ZooKeeperClient与ZooKeeperClientWatcher
ZooKeeperClient类中初始化了原生的ZooKeeper对象
1 @volatile private var zooKeeper = new ZooKeeper (connectString, sessionTimeoutMs, ZooKeeperClientWatcher)
Watch永远都是ZooKeeper的核心对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private [zookeeper] object ZooKeeperClientWatcher extends Watcher { override def process (event: WatchedEvent) : Unit = { Option(event.getPath) match { case None = > val state = event.getState stateToMeterMap.get(state).foreach(_.mark()) inLock(isConnectedOrExpiredLock) { isConnectedOrExpiredCondition.signalAll() } if (state == KeeperState.AuthFailed) { error("Auth failed." ) stateChangeHandlers.values.foreach(_.onAuthFailure()) } else if (state == KeeperState.Expired) { scheduleSessionExpiryHandler() } case Some (path) => (event.getType: @unchecked ) match { case EventType.NodeChildrenChanged => zNodeChildChangeHandlers.get(path).foreach(_.handleChildChange()) case EventType.NodeCreated => zNodeChangeHandlers.get(path).foreach(_.handleCreation()) case EventType.NodeDeleted => zNodeChangeHandlers.get(path).foreach(_.handleDeletion()) case EventType.NodeDataChanged => zNodeChangeHandlers.get(path).foreach(_.handleDataChange()) } } } }
和java类似的写法,没有path时kafka对AuthFailed和Expired两种情况作了处理,不是重点
ZooKeeper使用EventType表示节点的4种事件,kafka针对不同节点的不同事件都有一组handler去处理,这里通过path获取handler并执行
注:不要被foreach迷惑,Option类的foreach表示对象不为空,就执行传入的函数
handlers定义的Map如下,此时Map是空的,会在后续的kafka启动程序中将handler添加进来(比如Controller启动)
1 2 private val zNodeChangeHandlers = new ConcurrentHashMap [String, ZNodeChangeHandler]().asScalaprivate val zNodeChildChangeHandlers = new ConcurrentHashMap [String, ZNodeChildChangeHandler]().asScala
其实到现在ZooKeeper已经启动完毕了,但是事情没有这么简单,kafka对事件的处理又采用了经典的内存队列异步处理模式,这种模式在kafka中无处不在
kafka内存队列异步处理zk事件
通过一个简单的例子来说明kafka是如何处理zk事件的
上述的ZNodeChildChangeHandler只是一个接口,我们看下其中一个实现类
1 2 3 4 5 6 7 class ControllerChangeHandler (controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler { override val path: String = ControllerZNode.path override def handleCreation () : Unit = eventManager.put(controller.ControllerChange) override def handleDeletion () : Unit = eventManager.put(controller.Reelect) override def handleDataChange () : Unit = eventManager.put(controller.ControllerChange) }
原理
zk将不同的节点事件,转换成kafka内部的事件处理器,封装成了一个ControllerEvent对象,然后放到一个内存队列里,启动一个线程轮询处理zk事件
BrokerChange的源码如下, 主要处理放到了process中
1 2 3 4 5 6 7 8 9 10 eventManager.put(controller.BrokerChange) case object BrokerChange extends ControllerEvent { override def state: ControllerState = ControllerState.BrokerChange override def process () : Unit = { } }
那么eventManager#put做了什么呢
1 2 3 4 5 private val queue = new LinkedBlockingQueue [ControllerEvent]def put (event: ControllerEvent) : Unit = inLock(putLock) { queue.put(event) }
把BrokerChange放入到了一个LinkedBlockingQueue中
而在后续的eventManager启动过程中,启动了ControllerEventThread线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private val thread = new ControllerEventThread (ControllerEventManager.ControllerEventThreadName)def start () : Unit = thread.start() class ControllerEventThread (name: String) extends ShutdownableThread (name = name, isInterruptible = false ) { override def doWork () : Unit = { queue.take() match { case KafkaController.ShutdownEventThread => initiateShutdown() case controllerEvent = > _state = controllerEvent.state eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs) try { rateAndTimeMetrics(state).time { controllerEvent.process() } } catch { case e: Throwable => error(s"Error processing event $controllerEvent" , e) } try eventProcessedListener (controllerEvent) catch { case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent" , e) } _state = ControllerState.Idle } } }
总结
从kafka的网络请求处理模型开始,就遇见了内存队列来异步处理的模型,这种模型和mq类似,不过它是本地内存中的队列,kafka有很多地方使用了这种模式,这也是我们学习源码之后的收获
最后用流程图总结下