1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| 1.readFromLocalLog(): 从本地日志拉取相应的数据 2.判断Fetch请求来源,如果来自副本同步,那么更新该副本的the end offset记录,如果该副本不在isr中,判断是否需要更新isr 3.返回结果,满足条件立即返回,否则通过延迟操作,延迟返回结果
def fetchMessages(timeout: Long, replicaId: Int, fetchMinBytes: Int, fetchMaxBytes: Int, hardMaxBytesLimit: Boolean, fetchInfos: Seq[(TopicPartition, PartitionData)], quota: ReplicaQuota = UnboundedQuota, responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit, isolationLevel: IsolationLevel) { val isFromFollower = Request.isValidBrokerId(replicaId) val fetchOnlyFromLeader = replicaId != Request.DebuggingConsumerId && replicaId != Request.FutureLocalReplicaId val fetchIsolation = if (isFromFollower || replicaId == Request.FutureLocalReplicaId) FetchLogEnd else if (isolationLevel == IsolationLevel.READ_COMMITTED) FetchTxnCommitted else FetchHighWatermark
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = { val result = readFromLocalLog( replicaId = replicaId, fetchOnlyFromLeader = fetchOnlyFromLeader, fetchIsolation = fetchIsolation, fetchMaxBytes = fetchMaxBytes, hardMaxBytesLimit = hardMaxBytesLimit, readPartitionInfo = fetchInfos, quota = quota) if (isFromFollower) updateFollowerLogReadResults(replicaId, result) else result }
val logReadResults = readFromLog()
val logReadResultValues = logReadResults.map { case (_, v) => v } val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum val errorReadingData = logReadResultValues.foldLeft(false) ((errorIncurred, readResult) => errorIncurred || (readResult.error != Errors.NONE))
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) { val fetchPartitionData = logReadResults.map { case (tp, result) => tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records, result.lastStableOffset, result.info.abortedTransactions) } responseCallback(fetchPartitionData) } else { val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) => val fetchInfo = fetchInfos.collectFirst { case (tp, v) if tp == topicPartition => v }.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos")) (topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo)) } val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit, fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus) val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys) } }
|