Kafka源码系列之十六各Handler详解

主要介绍Controller在启动时注册的这么多监听,各自的用处

监听概览

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// KafkaController.Startup
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)

// KafkaController.onControllerFailover()
val childChangeHandlers = Seq(brokerChangeHandler, topicChangeHandler, topicDeletionHandler, logDirEventNotificationHandler, isrChangeNotificationHandler)
childChangeHandlers.foreach(zkClient.registerZNodeChildChangeHandler)
val nodeChangeHandlers = Seq(preferredReplicaElectionHandler, partitionReassignmentHandler)
nodeChangeHandlers.foreach(zkClient.registerZNodeChangeHandlerAndCheckExistence)

controllerChangeHandler
brokerChangeHandler
topicChangeHandler
topicDeletionHandler
logDirEventNotificationHandler
isrChangeNotificationHandler
preferredReplicaElectionHandler
partitionReassignmentHandler

ZNodeChangeHandler

1
ZK节点变化监听

ControllerChangeHandler

1
2
3
4
5
6
7
8
9
10
def path = "/controller"

// 主要监听controller节点的变化
// 对应的操作有ControllerChange,Reelect
class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = ControllerZNode.path
override def handleCreation(): Unit = eventManager.put(controller.ControllerChange)
override def handleDeletion(): Unit = eventManager.put(controller.Reelect)
override def handleDataChange(): Unit = eventManager.put(controller.ControllerChange)
}

PreferredReplicaElectionHandler

1
2
3
4
5
6
7
8
def path = s"${AdminZNode.path}/preferred_replica_election"

// 监听Partition最优Leader选举
class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = PreferredReplicaElectionZNode.path

override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection)
}

PartitionReassignmentHandler

1
2
3
4
5
6
7
8
9
10
11
def path = s"${AdminZNode.path}/reassign_partitions"

// 监听分区副本迁移
class PartitionReassignmentHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler {
override val path: String = ReassignPartitionsZNode.path

// Note that the event is also enqueued when the znode is deleted, but we do it explicitly instead of relying on
// handleDeletion(). This approach is more robust as it doesn't depend on the watcher being re-registered after
// it's consumed during data changes (we ensure re-registration when the znode is deleted).
override def handleCreation(): Unit = eventManager.put(controller.PartitionReassignment)
}

ZNodeChildChangeHandler

1
监听ZK子节点信息变化

BrokerChangeHandler

1
2
3
4
5
6
7
8
9
10
11
Broker上下线变化

def path = s"${BrokersZNode.path}/ids"

// 监听Broker变化,对应操作BrokerChange
class BrokerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = BrokerIdsZNode.path
override def handleChildChange(): Unit = {
eventManager.put(controller.BrokerChange)
}
}

TopicChangeHandler

1
2
3
4
5
6
7
def path = s"${BrokersZNode.path}/topics"

// 监听topic变化,对应操作TopicChange
class TopicChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = TopicsZNode.path
override def handleChildChange(): Unit = eventManager.put(controller.TopicChange)
}

TopicDeletionHandler

1
2
3
4
5
6
7
8
def path = s"${AdminZNode.path}/delete_topics"

// 监听删除topic的变化,TopicDeletion
class TopicDeletionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = DeleteTopicsZNode.path

override def handleChildChange(): Unit = eventManager.put(controller.TopicDeletion)
}

LogDirEventNotificationHandler

1
2
3
4
5
6
7
8
def path = "/log_dir_event_notification"

// 监听日志目录事件通知,LogDirEventNotification
class LogDirEventNotificationHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = LogDirEventNotificationZNode.path

override def handleChildChange(): Unit = eventManager.put(controller.LogDirEventNotification)
}

IsrChangeNotificationHandler

1
2
3
4
5
6
7
8
def path = "/isr_change_notification"

// 监听Partition ISR变化,IsrChangeNotification
class IsrChangeNotificationHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
override val path: String = IsrChangeNotificationZNode.path

override def handleChildChange(): Unit = eventManager.put(controller.IsrChangeNotification)
}