1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
   | 1.本节点的aliveNodes和aliveBrokers记录,更新为最新的记录 2.对于要删除的tp,从缓存中删除,并记录下来作为这个方法的返回 3.对于其他的tp,addOrUpdatePartitionInfo
  def updateMetadata(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {     inWriteLock(partitionMetadataLock) {
               val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataRequest.PartitionState]](metadataSnapshot.partitionStates.size)       metadataSnapshot.partitionStates.foreach { case (topic, oldPartitionStates) =>         val copy = new mutable.LongMap[UpdateMetadataRequest.PartitionState](oldPartitionStates.size)         copy ++= oldPartitionStates         partitionStates += (topic -> copy)       }              val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)       val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size)       val controllerId = updateMetadataRequest.controllerId match {           case id if id < 0 => None           case id => Some(id)         }
        updateMetadataRequest.liveBrokers.asScala.foreach { broker =>                                    val nodes = new java.util.HashMap[ListenerName, Node]         val endPoints = new mutable.ArrayBuffer[EndPoint]         broker.endPoints.asScala.foreach { ep =>           endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol)           nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port))         }         aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))         aliveNodes(broker.id) = nodes.asScala       }       aliveNodes.get(brokerId).foreach { listenerMap =>         val listeners = listenerMap.keySet         if (!aliveNodes.values.forall(_.keySet == listeners))           error(s"Listeners are not identical across brokers: $aliveNodes")       }
        val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]       updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>         val controllerId = updateMetadataRequest.controllerId         val controllerEpoch = updateMetadataRequest.controllerEpoch         if (info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete) {                      removePartitionInfo(partitionStates, tp.topic, tp.partition)           stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata " +             s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")           deletedPartitions += tp         } else {                      addOrUpdatePartitionInfo(partitionStates, tp.topic, tp.partition, info)           stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to " +             s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")         }       }       metadataSnapshot = MetadataSnapshot(partitionStates, controllerId, aliveBrokers, aliveNodes)       deletedPartitions     } }
   |