private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest]
val (authorizedRequestInfo, unauthorizedRequestInfo) = offsetRequest.partitionTimestamps.asScala.partition { case (topicPartition, _) => authorize(request.session, Describe, Resource(Topic, topicPartition.topic, LITERAL)) }
val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) })
val responseMap = authorizedRequestInfo.map { case (topicPartition, timestamp) => // 获取leader val localReplica = replicaManager.getLeaderReplicaIfLocal(topicPartition)
// -1表示consumer val fromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID
val found = if (fromConsumer) { // 根据事务隔离级别,获取可拉取的位移 val lastFetchableOffset = offsetRequest.isolationLevel match { case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset // 默认没使用事务,返回的是highWatermark case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset }
private def fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset] = { replicaManager.getLog(topicPartition) match { caseSome(log)=> // 从Log的所有Segment里,根据timestamp找offset log.fetchOffsetsByTimestamp(timestamp) case None => thrownew UnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.") } }
def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = { maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { // 所有LogSegment的副本,共享变私有,避免锁竞争 val segmentsCopy = logSegments.toBuffer // For the earliest and latest, we do not need to return the timestamp. if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) // earliest返回logStartOffset:当前TP在日志自动清理后,目前最小的offset return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset)) elseif (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) // latest返回LEO 但是为什么返回LEO呢,万一一直没提交呢,返回HW不是更稳妥吗 return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
// earliest,latest之外的类型:Timestamp表示具体的时间戳,-1,-2只是表示了2个特殊的offset val targetSeg = { // Get all the segments whose largest timestamp is smaller than target timestamp // 先找segments,找第一个Segment的最大Timestamp大于请求中的Timestamp,可以看下takeWhile源码 val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) // takeWhile牛逼啊,一直循环,只要不满足表示式停止 // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. // 再找offset if (earlierSegs.length < segmentsCopy.length) Some(segmentsCopy(earlierSegs.length)) else None }
def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = { // Get the index entry with a timestamp less than or equal to the target timestamp
val timestampOffset = timeIndex.lookup(timestamp) val position = offsetIndex.lookup(math.max(timestampOffset.offset, startingOffset)).position