本文主要介绍Consumer在第一次拉取消息前,获取Coordinator的过程,衔接Kafka消费者-源码分析 一文
前言
在ConsumerCoordinator的poll方法中,我们聊到第一次poll时,consumer需要加入消费者组,此时coordinator未知,需要向broker获取
1 2 3 4 5 6 7 8 9 10 11 public boolean poll (final long timeoutMs) { if (coordinatorUnknown()) { if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) { return false ; } } return true ; }
ensureCoordinatorReady方法也很简单,主要是lookupCoordinator方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected synchronized boolean ensureCoordinatorReady (final long timeoutMs) { while (coordinatorUnknown()) { final RequestFuture<Void> future = lookupCoordinator(); client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime)); if (!future.isDone()) { break ; } if (future.failed()) { } } return !coordinatorUnknown(); }
lookupCoordinator主要是调用了sendFindCoordinatorRequest来发起FindCoordinator请求
请求
FindCoordinatorRequest的主要参数是groupId,参数名为coordinatorKey,以下是Consumer发送FindCoordinatorRequest请求的源码
1 2 3 4 5 6 7 8 private RequestFuture<Void> sendFindCoordinatorRequest (Node node) { log.debug("Sending FindCoordinator request to broker {}" , node); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest .Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this .groupId); return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler ()); }
json请求体如下:
1 2 3 4 5 { "coordinatorKey" : "your group id" , "coordinatorType" : 0 , "minVersion" : 0 , }
server端处理
server获取Coordinator的过程大致分为以下3个步骤
计算分区,groupId的hashcode%50,50是__consumer_offsets的分区个数
获取__consumer_offsets所有分区的元信息
从第2步所有分区的元数据里,过滤出用第1步计算好的分区的元数据
这里没有直接用第一步计算的分区去获取,是因为调用的方法具有通用性,个人认为这里优化下也不难
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 def handleFindCoordinatorRequest (request: RequestChannel.Request) { val findCoordinatorRequest = request.body[FindCoordinatorRequest] if (认证失败...) { } else { val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match { case FindCoordinatorRequest.CoordinatorType.GROUP => val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata) case 如果是__transaction_state => case _ = > throw new InvalidRequestException ("Unknown coordinator type in FindCoordinator request" ) } def createResponse (requestThrottleMs: Int) : AbstractResponse = { val responseBody = if (topicMetadata.error != Errors.NONE) { new FindCoordinatorResponse (requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala .find(_.partition == partition) .map(_.leader) .flatMap(p => Option(p)) coordinatorEndpoint match { case Some (endpoint) if !endpoint.isEmpty => new FindCoordinatorResponse (requestThrottleMs, Errors.NONE, endpoint) case _ = > new FindCoordinatorResponse (requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } } responseBody } sendResponseMaybeThrottle(request, createResponse) } }
这里返回给客户端的FindCoordinatorResponse对象,大致结构如下,主要是错误信息和Coordinator所在的节点信息
1 2 3 4 5 6 7 8 9 { "error" : { ...} , "node" : { "id" : 0 , "host" : "127.0.0.1" , "port" : "9092" , "rack" : "-1" } }
获取topic分区元数据
经过getOrCreateInternalTopic方法调用,getPartitionMetadata用于获取topic所有分区的元数据,即上面的第2步
该方法大部分都是对maybeLeader异常的判断,直接看最后一行代码即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 private def getOrCreateInternalTopic (topic: String, listenerName: ListenerName) : MetadataResponse.TopicMetadata = { val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName) topicMetadata.headOption.getOrElse(createInternalTopic(topic)) } def getTopicMetadata (topics: Set[String], listenerName: ListenerName, errorUnavailableEndpoints: Boolean = false , errorUnavailableListeners: Boolean = false ) : Seq[MetadataResponse.TopicMetadata] = { topics.toSeq.flatMap { topic => getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata => new MetadataResponse .TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava) } } } private def getPartitionMetadata (topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, errorUnavailableListeners: Boolean) : Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition(topic, partitionId) val leaderBrokerId = partitionState.basePartitionState.leader val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName) val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints) val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints) maybeLeader match { case None = > val error = if (!aliveBrokers.contains(brokerId)) { debug(s"Error while fetching metadata for $topicPartition: leader not available" ) Errors.LEADER_NOT_AVAILABLE } else { debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId" ) if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE } new MetadataResponse .PartitionMetadata(error, partitionId, Node.noNode(), replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava) case Some (leader) => val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt) val isrInfo = getEndpoints(isr, listenerName, errorUnavailableEndpoints) if (replicaInfo.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(" ,")}" ) new MetadataResponse .PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } else if (isrInfo.size < isr.size) { debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " + s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(" ,")}" ) new MetadataResponse .PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } else { new MetadataResponse .PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } } } } }
小结
FindCoordinatorRequest请求是consumer在拉取消息时的前置步骤,用于确保coordinator的存在,broker具体的做法是根据consumer的groupId确定其所在的__consumer_offsets分区,之后再获取该分区的元数据,主要的元信息为分区leader,replica,isr集合,离线副本集合
下面是在客户端角度,coordinator初始化的流程,大致归纳为:以groupId为参数,向一个负载最小(未完成请求最少)的节点发送请求,成功之后初始化coordinator