def handleFindCoordinatorRequest(request: RequestChannel.Request){ val findCoordinatorRequest = request.body[FindCoordinatorRequest]
if (认证失败...) { // 省略... } else { // get metadata (and create the topic if necessary) val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match { case FindCoordinatorRequest.CoordinatorType.GROUP => // 计算分区 val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) // 这里拿到的是topic所有分区的元数据 val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata)
case 如果是__transaction_state => // 处理 ... case _ => thrownew InvalidRequestException("Unknown coordinator type in FindCoordinator request") }
def createResponse(requestThrottleMs: Int): AbstractResponse = { val responseBody = if (topicMetadata.error != Errors.NONE) { new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { // 获取消费者坐在分区的leader所在的node val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala .find(_.partition == partition) .map(_.leader) .flatMap(p => Option(p))
// 有则返回,没有报错 coordinatorEndpoint match { caseSome(endpoint)if !endpoint.isEmpty => new FindCoordinatorResponse(requestThrottleMs, Errors.NONE, endpoint) case _ => new FindCoordinatorResponse(requestThrottleMs, Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } } responseBody } // 响应 sendResponseMaybeThrottle(request, createResponse) } }
private def getPartitionMetadata(topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean, errorUnavailableListeners: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = { cache.get(topic).map { partitions => partitions.map { case (partitionId, partitionState) => val topicPartition = TopicAndPartition(topic, partitionId) val leaderBrokerId = partitionState.basePartitionState.leader val maybeLeader = getAliveEndpoint(leaderBrokerId, listenerName) // 可能为空 val replicas = partitionState.basePartitionState.replicas.asScala.map(_.toInt) val replicaInfo = getEndpoints(replicas, listenerName, errorUnavailableEndpoints) // 副本所在的Node // 离线副本的Node信息 val offlineReplicaInfo = getEndpoints(partitionState.offlineReplicas.asScala.map(_.toInt), listenerName, errorUnavailableEndpoints)
maybeLeader match { case None => val error = if (!aliveBrokers.contains(brokerId)) { // we are already holding the read lock debug(s"Error while fetching metadata for $topicPartition: leader not available") Errors.LEADER_NOT_AVAILABLE } else { debug(s"Error while fetching metadata for $topicPartition: listener $listenerName not found on leader $leaderBrokerId") if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE } new MetadataResponse.PartitionMetadata(error, partitionId, Node.noNode(), replicaInfo.asJava, java.util.Collections.emptyList(), offlineReplicaInfo.asJava)
caseSome(leader)=> val isr = partitionState.basePartitionState.isr.asScala.map(_.toInt) val isrInfo = getEndpoints(isr, listenerName, errorUnavailableEndpoints) // isr的Node信息
// 副本信息不全 if (replicaInfo.size < replicas.size) { debug(s"Error while fetching metadata for $topicPartition: replica information not available for " + s"following brokers ${replicas.filterNot(replicaInfo.map(_.id).contains).mkString(",")}")
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } elseif (isrInfo.size < isr.size) { // isr 信息不全 debug(s"Error while fetching metadata for $topicPartition: in sync replica information not available for " + s"following brokers ${isr.filterNot(isrInfo.map(_.id).contains).mkString(",")}") new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } else { // 分区的leader,isr,replica,offline replica信息 new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava, offlineReplicaInfo.asJava) } } } } }