1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| Broker分配的任何一个partition都是以replica对象实例的形式存在 而replica在kafka上有两个角色: leader和follower 只要这个replica是follower,那么它就会向leader进行数据同步
如果Broker的本地副本被选举为follower,那么就会启动副本同步线程
private def makeFollowers(controllerId: Int, epoch: Int, partitionStates: Map[Partition, LeaderAndIsrRequest.PartitionState], correlationId: Int, responseMap: mutable.Map[TopicPartition, Errors]) : Set[Partition] = { partitionStates.foreach { case (partition, partitionState) => stateChangeLogger.trace(s"Handling LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $epoch starting the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionState.basePartitionState.leader}") }
for (partition <- partitionStates.keys) responseMap.put(partition.topicPartition, Errors.NONE)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try { partitionStates.foreach { case (partition, partitionStateInfo) => val newLeaderBrokerId = partitionStateInfo.basePartitionState.leader try { metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match { case Some(_) => if (partition.makeFollower(controllerId, partitionStateInfo, correlationId)) partitionsToMakeFollower += partition else stateChangeLogger.info(s"Skipped the become-follower state change after marking its partition as " + s"follower with correlation id $correlationId from controller $controllerId epoch $epoch " + s"for partition ${partition.topicPartition} (last update " + s"controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"since the new leader $newLeaderBrokerId is the same as the old leader") case None => stateChangeLogger.error(s"Received LeaderAndIsrRequest with correlation id $correlationId from " + s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) " + s"but cannot become follower since the new leader $newLeaderBrokerId is unavailable.") partition.getOrCreateReplica(localBrokerId, isNew = partitionStateInfo.isNew) } } catch { case e: KafkaStorageException => stateChangeLogger.error(s"Skipped the become-follower state change with correlation id $correlationId from " + s"controller $controllerId epoch $epoch for partition ${partition.topicPartition} " + s"(last update controller epoch ${partitionStateInfo.basePartitionState.controllerEpoch}) with leader " + s"$newLeaderBrokerId since the replica for the partition is offline due to disk error $e") val dirOpt = getLogDir(partition.topicPartition) error(s"Error while making broker the follower for partition $partition with leader " + s"$newLeaderBrokerId in dir $dirOpt", e) responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR) } }
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition)) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Stopped fetchers as part of become-follower request from controller $controllerId " + s"epoch $epoch with correlation id $correlationId for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).basePartitionState.leader}") }
partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition) tryCompleteDelayedProduce(topicPartitionOperationKey) tryCompleteDelayedFetch(topicPartitionOperationKey) }
partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Truncated logs and checkpointed recovery boundaries for partition " + s"${partition.topicPartition} as part of become-follower request with correlation id $correlationId from " + s"controller $controllerId epoch $epoch with leader ${partitionStates(partition).basePartitionState.leader}") }
if (isShuttingDown.get()) { partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Skipped the adding-fetcher step of the become-follower state " + s"change with correlation id $correlationId from controller $controllerId epoch $epoch for " + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader} " + "since it is shutting down") } } else { val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get .brokerEndPoint(config.interBrokerListenerName) val fetchOffset = partition.localReplicaOrException.highWatermark.messageOffset partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) }.toMap replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(s"Started fetcher to new leader as part of become-follower " + s"request from controller $controllerId epoch $epoch with correlation id $correlationId for " + s"partition ${partition.topicPartition} with leader ${partitionStates(partition).basePartitionState.leader}") } } } catch { case e: Throwable => stateChangeLogger.error(s"Error while processing LeaderAndIsr request with correlationId $correlationId " + s"received from controller $controllerId epoch $epoch", e) throw e }
partitionStates.keys.foreach { partition => stateChangeLogger.trace(s"Completed LeaderAndIsr request correlationId $correlationId from controller $controllerId " + s"epoch $epoch for the become-follower transition for partition ${partition.topicPartition} with leader " + s"${partitionStates(partition).basePartitionState.leader}") }
partitionsToMakeFollower }
并不一定会为每一个partition都启动一个fetcher线程,对于一个目的Broker 只会启动num.replica.fetchers个线程 具体这个tp会分配到那个fetcher线程上,根据topic名和partitionId计算得到的 private def getFetcherId(topic: String, partitionId: Int) : Int = { Utils.abs(31 * topic.hashCode() + partitionId) % numFetchers }
|