1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| private def leaderForOffline(leaderIsrAndControllerEpochs: Seq[(TopicPartition, LeaderIsrAndControllerEpoch)]): Seq[(TopicPartition, Option[LeaderAndIsr], Seq[Int])] = { val (partitionsWithNoLiveInSyncReplicas, partitionsWithLiveInSyncReplicas) = leaderIsrAndControllerEpochs.partition { case (partition, leaderIsrAndControllerEpoch) => val liveInSyncReplicas = leaderIsrAndControllerEpoch.leaderAndIsr.isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) liveInSyncReplicas.isEmpty } val (logConfigs, failed) = zkClient.getLogConfigs(partitionsWithNoLiveInSyncReplicas.map { case (partition, _) => partition.topic }, config.originals()) val partitionsWithUncleanLeaderElectionState = partitionsWithNoLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => if (failed.contains(partition.topic)) { logFailedStateChange(partition, partitionState(partition), OnlinePartition, failed(partition.topic)) (partition, None, false) } else { (partition, Option(leaderIsrAndControllerEpoch), logConfigs(partition.topic).uncleanLeaderElectionEnable.booleanValue()) } } ++ partitionsWithLiveInSyncReplicas.map { case (partition, leaderIsrAndControllerEpoch) => (partition, Option(leaderIsrAndControllerEpoch), false) } partitionsWithUncleanLeaderElectionState.map { case (partition, leaderIsrAndControllerEpochOpt, uncleanLeaderElectionEnabled) => val assignment = controllerContext.partitionReplicaAssignment(partition) val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition)) if (leaderIsrAndControllerEpochOpt.nonEmpty) { val leaderIsrAndControllerEpoch = leaderIsrAndControllerEpochOpt.get val isr = leaderIsrAndControllerEpoch.leaderAndIsr.isr val leaderOpt = PartitionLeaderElectionAlgorithms.offlinePartitionLeaderElection(assignment, isr, liveReplicas.toSet, uncleanLeaderElectionEnabled, controllerContext) val newLeaderAndIsrOpt = leaderOpt.map { leader => val newIsr = if (isr.contains(leader)) isr.filter(replica => controllerContext.isReplicaOnline(replica, partition)) else List(leader) leaderIsrAndControllerEpoch.leaderAndIsr.newLeaderAndIsr(leader, newIsr) } (partition, newLeaderAndIsrOpt, liveReplicas) } else { (partition, None, liveReplicas) } } }
|