private def fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset] = { replicaManager.getLog(topicPartition) match { caseSome(log) => // 从Log的所有Segment里,根据timestamp找offset log.fetchOffsetsByTimestamp(timestamp) caseNone=> thrownewUnknownTopicOrPartitionException(s"$topicPartition does not exist on the broker.") } }
def fetchOffsetsByTimestamp(targetTimestamp: Long): Option[TimestampOffset] = { maybeHandleIOException(s"Error while fetching offset by timestamp for $topicPartition in dir ${dir.getParent}") { // 所有LogSegment的副本,共享变私有,避免锁竞争 valsegmentsCopy= logSegments.toBuffer // For the earliest and latest, we do not need to return the timestamp. if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP) // earliest返回logStartOffset:当前TP在日志自动清理后,目前最小的offset return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset)) elseif (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP) // latest返回LEO 但是为什么返回LEO呢,万一一直没提交呢,返回HW不是更稳妥吗 return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
// earliest,latest之外的类型:Timestamp表示具体的时间戳,-1,-2只是表示了2个特殊的offset valtargetSeg= { // Get all the segments whose largest timestamp is smaller than target timestamp // 先找segments,找第一个Segment的最大Timestamp大于请求中的Timestamp,可以看下takeWhile源码 valearlierSegs= segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp) // takeWhile牛逼啊,一直循环,只要不满足表示式停止 // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one. // 再找offset if (earlierSegs.length < segmentsCopy.length) Some(segmentsCopy(earlierSegs.length)) else None }
def findOffsetByTimestamp(timestamp: Long, startingOffset: Long = baseOffset): Option[TimestampOffset] = { // Get the index entry with a timestamp less than or equal to the target timestamp