1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
|
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata], nPartitions: Int, replicationFactor: Int, fixedStartIndex: Int = -1, startPartitionId: Int = -1): Map[Int, Seq[Int]] = { if (nPartitions <= 0) throw new InvalidPartitionsException("Number of partitions must be larger than 0.") if (replicationFactor <= 0) throw new InvalidReplicationFactorException("Replication factor must be larger than 0.") if (replicationFactor > brokerMetadatas.size) throw new InvalidReplicationFactorException(s"Replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}.") if (brokerMetadatas.forall(_.rack.isEmpty)) assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex, startPartitionId) else { if (brokerMetadatas.exists(_.rack.isEmpty)) throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment.") assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex, startPartitionId) } }
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, replicationFactor: Int, brokerList: Seq[Int], fixedStartIndex: Int, startPartitionId: Int): Map[Int, Seq[Int]] = { val ret = mutable.Map[Int, Seq[Int]]() val brokerArray = brokerList.toArray val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) var currentPartitionId = math.max(0, startPartitionId) var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length) for (_ <- 0 until nPartitions) { if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)) nextReplicaShift += 1 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex)) for (j <- 0 until replicationFactor - 1) replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length)) ret.put(currentPartitionId, replicaBuffer) currentPartitionId += 1 } ret }
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = { val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1) (firstReplicaIndex + shift) % nBrokers }
|