| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 
 | 1.readFromLocalLog(): 从本地日志拉取相应的数据2.判断Fetch请求来源,如果来自副本同步,那么更新该副本的the end offset记录,如果该副本不在isr中,判断是否需要更新isr
 3.返回结果,满足条件立即返回,否则通过延迟操作,延迟返回结果
 
 
 def fetchMessages(timeout: Long,
 replicaId: Int,
 fetchMinBytes: Int,
 fetchMaxBytes: Int,
 hardMaxBytesLimit: Boolean,
 fetchInfos: Seq[(TopicPartition, PartitionData)],
 quota: ReplicaQuota = UnboundedQuota,
 responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
 isolationLevel: IsolationLevel) {
 
 val isFromFollower = Request.isValidBrokerId(replicaId)
 
 val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId
 
 
 
 val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId)
 FetchLogEnd
 else if (isolationLevel == IsolationLevel.READ_COMMITTED)
 FetchTxnCommitted
 else
 FetchHighWatermark
 
 
 def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
 
 val result = readFromLocalLog(
 replicaId = replicaId,
 fetchOnlyFromLeader = fetchOnlyFromLeader,
 fetchIsolation = fetchIsolation,
 fetchMaxBytes = fetchMaxBytes,
 hardMaxBytesLimit = hardMaxBytesLimit,
 readPartitionInfo = fetchInfos,
 quota = quota)
 
 if (isFromFollower) updateFollowerLogReadResults(replicaId, result)
 else result
 }
 
 val logReadResults = readFromLog()
 
 
 val logReadResultValues = logReadResults.map { case (_, v) => v }
 val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
 val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) =>
 errorIncurred || (readResult.error != Errors.NONE))
 
 
 
 
 
 
 if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
 val fetchPartitionData = logReadResults.map { case (tp, result) =>
 tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
 result.lastStableOffset, result.info.abortedTransactions)
 }
 responseCallback(fetchPartitionData)
 } else {
 
 
 val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
 val fetchInfo = fetchInfos.collectFirst {
 case (tp, v) if tp == topicPartition => v
 }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
 (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
 }
 val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader,
 fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
 val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
 
 
 val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
 
 
 
 
 delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
 }
 }
 
 |