1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
| def handleProduceRequest(request: RequestChannel.Request) { val produceRequest = request.body[ProduceRequest] val numBytesAppended = request.header.toStruct.sizeOf + request.sizeOfBodyInBytes
if (produceRequest.isTransactional) { if (!authorize(request.session, Write, Resource(TransactionalId, produceRequest.transactionalId, LITERAL))) { sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) return }
} else if (produceRequest.isIdempotent && !authorize(request.session, IdempotentWrite, Resource.ClusterResource)) { sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception) return }
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]() val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]() for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { if (!authorize(request.session, Write, Resource(Topic, topicPartition.topic, LITERAL))) unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) else if (!metadataCache.contains(topicPartition)) nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) else authorizedRequestInfo += (topicPartition -> memoryRecords) }
def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { val mergedResponseStatus = responseStatus ++ unauthorizedTopicResponses ++ nonExistingTopicResponses var errorInResponse = false
mergedResponseStatus.foreach { case (topicPartition, status) => if (status.error != Errors.NONE) { errorInResponse = true debug("Produce request with correlation id %d from client %s on partition %s failed due to %s".format( request.header.correlationId, request.header.clientId, topicPartition, status.error.exceptionName)) } }
request.apiRemoteCompleteTimeNanos = time.nanoseconds
val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds()) val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request) val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) if (maxThrottleTimeMs > 0) { if (bandwidthThrottleTimeMs > requestThrottleTimeMs) { quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse) } else { quotas.request.throttle(request, requestThrottleTimeMs, sendResponse) } }
if (produceRequest.acks == 0) { if (errorInResponse) { val exceptionsSummary = mergedResponseStatus.map { case (topicPartition, status) => topicPartition -> status.error.exceptionName }.mkString(", ") info( s"Closing connection due to error during produce request with correlation id ${request.header.correlationId} " + s"from client id ${request.header.clientId} with ack=0\n" + s"Topic and partition to exceptions: $exceptionsSummary" ) closeConnection(request, new ProduceResponse(mergedResponseStatus.asJava).errorCounts) } else { sendNoOpResponseExemptThrottle(request) } } else { sendResponse(request, Some(new ProduceResponse(mergedResponseStatus.asJava, maxThrottleTimeMs)), None) } }
def processingStatsCallback(processingStats: FetchResponseStats): Unit = { processingStats.foreach { case (tp, info) => updateRecordConversionStats(request, tp, info) } }
if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { val internalTopicsAllowed = request.header.clientId == AdminUtils.AdminClientId
replicaManager.appendRecords( timeout = produceRequest.timeout.toLong, requiredAcks = produceRequest.acks, internalTopicsAllowed = internalTopicsAllowed, isFromClient = true, entriesPerPartition = authorizedRequestInfo, responseCallback = sendResponseCallback, recordConversionStatsCallback = processingStatsCallback)
produceRequest.clearPartitionRecords() } }
|