主要介绍Controller在启动时注册的这么多监听,各自的用处
监听概览 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler) val childChangeHandlers = Seq (brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler)childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler) val nodeChangeHandlers = Seq (preferredReplicaElectionHandler, partitionReassignmentHandler)nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence) controllerChangeHandler brokerChangeHandler topicChangeHandler topicDeletionHandler logDirEventNotificationHandler isrChangeNotificationHandler preferredReplicaElectionHandler partitionReassignmentHandler
ZNodeChangeHandler
ControllerChangeHandler 1 2 3 4 5 6 7 8 9 10 def path = "/controller" class ControllerChangeHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChangeHandler { override val path: String = ControllerZNode .path override def handleCreation (): Unit = eventManager.put(controller.ControllerChange ) override def handleDeletion (): Unit = eventManager.put(controller.Reelect ) override def handleDataChange (): Unit = eventManager.put(controller.ControllerChange ) }
PreferredReplicaElectionHandler 1 2 3 4 5 6 7 8 def path = s"${AdminZNode.path} /preferred_replica_election" class PreferredReplicaElectionHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChangeHandler { override val path: String = PreferredReplicaElectionZNode .path override def handleCreation (): Unit = eventManager.put(controller.PreferredReplicaLeaderElection ) }
PartitionReassignmentHandler 1 2 3 4 5 6 7 8 9 10 11 def path = s"${AdminZNode.path} /reassign_partitions" class PartitionReassignmentHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChangeHandler { override val path: String = ReassignPartitionsZNode .path override def handleCreation (): Unit = eventManager.put(controller.PartitionReassignment ) }
ZNodeChildChangeHandler
BrokerChangeHandler 1 2 3 4 5 6 7 8 9 10 11 Broker 上下线变化def path = s"${BrokersZNode.path} /ids" class BrokerChangeHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChildChangeHandler { override val path: String = BrokerIdsZNode .path override def handleChildChange (): Unit = { eventManager.put(controller.BrokerChange ) } }
TopicChangeHandler 1 2 3 4 5 6 7 def path = s"${BrokersZNode.path} /topics" class TopicChangeHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChildChangeHandler { override val path: String = TopicsZNode .path override def handleChildChange (): Unit = eventManager.put(controller.TopicChange ) }
TopicDeletionHandler 1 2 3 4 5 6 7 8 def path = s"${AdminZNode.path} /delete_topics" class TopicDeletionHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChildChangeHandler { override val path: String = DeleteTopicsZNode .path override def handleChildChange (): Unit = eventManager.put(controller.TopicDeletion ) }
LogDirEventNotificationHandler 1 2 3 4 5 6 7 8 def path = "/log_dir_event_notification" class LogDirEventNotificationHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChildChangeHandler { override val path: String = LogDirEventNotificationZNode .path override def handleChildChange (): Unit = eventManager.put(controller.LogDirEventNotification ) }
IsrChangeNotificationHandler 1 2 3 4 5 6 7 8 def path = "/isr_change_notification" class IsrChangeNotificationHandler (controller: KafkaController , eventManager: ControllerEventManager ) extends ZNodeChildChangeHandler { override val path: String = IsrChangeNotificationZNode .path override def handleChildChange (): Unit = eventManager.put(controller.IsrChangeNotification ) }