1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| 1.本节点的aliveNodes和aliveBrokers记录,更新为最新的记录 2.对于要删除的tp,从缓存中删除,并记录下来作为这个方法的返回 3.对于其他的tp,addOrUpdatePartitionInfo
def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = { inWriteLock(partitionMetadataLock) {
val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size) metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) => val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size) copy ++= oldPartitionStates partitionStates += (topic -> copy) } val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) val controllerId = updateMetadataRequest.controllerId match { case id if id < 0 => None case id => Some(id) }
updateMetadataRequest.liveBrokers.asScala.foreach { broker => val nodes = new java.util.HashMap[ListenerName, Node] val endPoints = new mutable.ArrayBuffer[EndPoint] broker.endPoints.asScala.foreach { ep => endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol) nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port)) } aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala } aliveNodes.get(brokerId).foreach { listenerMap => val listeners = listenerMap.keySet if (!aliveNodes.values.forall(_.keySet == listeners)) error(s"Listeners are not identical across brokers: $aliveNodes") }
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) => val controllerId = updateMetadataRequest.controllerId val controllerEpoch = updateMetadataRequest.controllerEpoch if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) { removePartitionInfo(partitionStates, tp.topic, tp.partition) stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " + s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") deletedPartitions += tp } else { addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info) stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " + s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId") } } metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes) deletedPartitions } }
|