1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| 可以了解KafkaProducer是怎么初始化一个Sender对象的
int maxInflightRequests = configureInflightRequests(producerConfig, transactionManager != null); config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION); return new Sender(logContext, client, metadata, this.accumulator, maxInflightRequests == 1, producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), acks, retries, metricsRegistry.senderMetrics, time, requestTimeoutMs, producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG), this.transactionManager, apiVersions);
public Sender(LogContext logContext, KafkaClient client, Metadata metadata, RecordAccumulator accumulator, boolean guaranteeMessageOrder,// 为true保证顺序性 int maxRequestSize, short acks, int retries, SenderMetricsRegistry metricsRegistry, Time time, int requestTimeoutMs, long retryBackoffMs, TransactionManager transactionManager, ApiVersions apiVersions) { this.log = logContext.logger(Sender.class); this.client = client; this.accumulator = accumulator; this.metadata = metadata; this.guaranteeMessageOrder = guaranteeMessageOrder; this.maxRequestSize = maxRequestSize; this.running = true; this.acks = acks; this.retries = retries; this.time = time; this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time); this.requestTimeoutMs = requestTimeoutMs; this.retryBackoffMs = retryBackoffMs; this.apiVersions = apiVersions; this.transactionManager = transactionManager; this.inFlightBatches = new HashMap<>(); }
|