1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| PartitionStateMachine记录着集群所有Partition的状态信息 决定一个Partition处在什么状态以及可以转变为什么状态
def startup() { info("Initializing partition state") // 初始化 initializePartitionState() info("Triggering online partition state changes") // 为所有处理NewPartition,OnlinePartition状态的Partition选举Leader triggerOnlinePartitionStateChange() info(s"Started partition state machine with initial state -> $partitionState") }
// 如果该Partition有LeaderAndIsr信息,PartitionLeader所在的机器是alive的,那么将其状态设置OnlinePartition,否则设置为OfflinePartition // 如果该Partition没有LeaderAndIsr信息,状态设置为NewPartition // 同样也是缓存partitionState private def initializePartitionState() { for (topicPartition <- controllerContext.allPartitions) { // check if leader and isr path exists for partition. If not, then it is in NEW state controllerContext.partitionLeadershipInfo.get(topicPartition) match { case Some(currentLeaderIsrAndEpoch) => // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state if (controllerContext.isReplicaOnline(currentLeaderIsrAndEpoch.leaderAndIsr.leader, topicPartition)) // leader is alive changeStateTo(topicPartition, NonExistentPartition, OnlinePartition) else changeStateTo(topicPartition, NonExistentPartition, OfflinePartition) case None => changeStateTo(topicPartition, NonExistentPartition, NewPartition) } } }
// 修改状态,在controller选举后或者broker上下线的时候触发 def triggerOnlinePartitionStateChange(partitionState: Map[TopicPartition, PartitionState]) { // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions // that belong to topics to be deleted val partitionsToTrigger = partitionState.filter { case (partition, partitionState) => !topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic) && (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) }.keys.toSeq // 更改状态 handleStateChanges(partitionsToTrigger, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy)) // TODO: If handleStateChanges catches an exception, it is not enough to bail out and log an error. // It is important to trigger leader election for those partitions. }
def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState, partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = { if (partitions.nonEmpty) { try { controllerBrokerRequestBatch.newBatch() // 尝试为处在OfflinePartition或NewPartition状态的Partition选主 // 成功后转换为OnlinePartition doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) // 发送请求给所有broker,包括LeaderAndIsr请求和UpdateMetadata请求,添加到队列中 controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) } catch { case e: Throwable => error(s"Error while moving some partitions to $targetState state", e) } } }
|