Class RecordAccumulator
java.lang.Object
org.apache.kafka.clients.producer.internals.RecordAccumulator
This class acts as a queue that accumulates records into
MemoryRecords
instances to be sent to the server.
The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfacestatic final classNode latency stats for each node that are used for adaptive partition distribution Visible for testingstatic final classPartitioner config for built-in partitionerstatic final classstatic final class -
Constructor Summary
ConstructorsConstructorDescriptionRecordAccumulator(org.apache.kafka.common.utils.LogContext logContext, int batchSize, org.apache.kafka.common.record.CompressionType compression, int lingerMs, long retryBackoffMs, long retryBackoffMaxMs, int deliveryTimeoutMs, RecordAccumulator.PartitionerConfig partitionerConfig, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpName, org.apache.kafka.common.utils.Time time, org.apache.kafka.clients.ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) Create a new record accumulatorRecordAccumulator(org.apache.kafka.common.utils.LogContext logContext, int batchSize, org.apache.kafka.common.record.CompressionType compression, int lingerMs, long retryBackoffMs, long retryBackoffMaxMs, int deliveryTimeoutMs, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpName, org.apache.kafka.common.utils.Time time, org.apache.kafka.clients.ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) Create a new record accumulator with default partitioner config -
Method Summary
Modifier and TypeMethodDescriptionvoidThis function is only called when sender is closed forcefully.append(String topic, int partition, long timestamp, byte[] key, byte[] value, org.apache.kafka.common.header.Header[] headers, RecordAccumulator.AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs, org.apache.kafka.common.Cluster cluster) Add a record to the accumulator, return the append resultvoidMark all partitions as ready to send and block until the send is completevoidInitiate the flushing of data from the accumulator...this makes all requests immediately readyvoidclose()Close this accumulator and force all the record buffers to be drainedvoiddeallocate(ProducerBatch batch) Deallocate the record batchdrain(org.apache.kafka.clients.Metadata metadata, Set<org.apache.kafka.common.Node> nodes, int maxSize, long now) Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis.expiredBatches(long now) Get a list of batches which have been sitting in the accumulator too long and need to be expired.getBuiltInPartitioner(String topic) longgetDeque(org.apache.kafka.common.TopicPartition tp) getNodeLatencyStats(Integer nodeId) booleanCheck whether there are any pending batches (whether sent or unsent).booleanCheck whether there are any batches which haven't been drainedvoidvoidmutePartition(org.apache.kafka.common.TopicPartition tp) longThe earliest absolute time a batch will expire (in milliseconds)ready(org.apache.kafka.clients.Metadata metadata, long nowMs) Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated partition batches.voidreenqueue(ProducerBatch batch, long now) Re-enqueue the given record batch in the accumulator.voidintsplitAndReenqueue(ProducerBatch bigBatch) Split the big batch that has been rejected and reenqueue the split batches in to the accumulator.voidunmutePartition(org.apache.kafka.common.TopicPartition tp) voidupdateNodeLatencyStats(Integer nodeId, long nowMs, boolean canDrain)
-
Constructor Details
-
RecordAccumulator
public RecordAccumulator(org.apache.kafka.common.utils.LogContext logContext, int batchSize, org.apache.kafka.common.record.CompressionType compression, int lingerMs, long retryBackoffMs, long retryBackoffMaxMs, int deliveryTimeoutMs, RecordAccumulator.PartitionerConfig partitionerConfig, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpName, org.apache.kafka.common.utils.Time time, org.apache.kafka.clients.ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) Create a new record accumulator- Parameters:
logContext- The log context used for loggingbatchSize- The size to use when allocatingMemoryRecordsinstancescompression- The compression codec for the recordslingerMs- An artificial delay time to add before declaring a records instance that isn't full ready for sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests).retryBackoffMs- An artificial delay time to retry the produce request upon receiving an error. This avoids exhausting all retries in a short period of time.retryBackoffMaxMs- The upper bound of the retry backoff time.deliveryTimeoutMs- An upper bound on the time to report success or failure on record deliverypartitionerConfig- Partitioner configmetrics- The metricsmetricGrpName- The metric group nametime- The time instance to useapiVersions- Request API versions for current connected brokerstransactionManager- The shared transaction state object which tracks producer IDs, epochs, and sequence numbers per partition.bufferPool- The buffer pool
-
RecordAccumulator
public RecordAccumulator(org.apache.kafka.common.utils.LogContext logContext, int batchSize, org.apache.kafka.common.record.CompressionType compression, int lingerMs, long retryBackoffMs, long retryBackoffMaxMs, int deliveryTimeoutMs, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpName, org.apache.kafka.common.utils.Time time, org.apache.kafka.clients.ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool) Create a new record accumulator with default partitioner config- Parameters:
logContext- The log context used for loggingbatchSize- The size to use when allocatingMemoryRecordsinstancescompression- The compression codec for the recordslingerMs- An artificial delay time to add before declaring a records instance that isn't full ready for sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests).retryBackoffMs- An artificial delay time to retry the produce request upon receiving an error. This avoids exhausting all retries in a short period of time.retryBackoffMaxMs- The upper bound of the retry backoff time.deliveryTimeoutMs- An upper bound on the time to report success or failure on record deliverymetrics- The metricsmetricGrpName- The metric group nametime- The time instance to useapiVersions- Request API versions for current connected brokerstransactionManager- The shared transaction state object which tracks producer IDs, epochs, and sequence numbers per partition.bufferPool- The buffer pool
-
-
Method Details
-
append
public RecordAccumulator.RecordAppendResult append(String topic, int partition, long timestamp, byte[] key, byte[] value, org.apache.kafka.common.header.Header[] headers, RecordAccumulator.AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs, org.apache.kafka.common.Cluster cluster) throws InterruptedException Add a record to the accumulator, return the append resultThe append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
- Parameters:
topic- The topic to which this record is being sentpartition- The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION if any partition could be usedtimestamp- The timestamp of the recordkey- The key for the recordvalue- The value for the recordheaders- the Headers for the recordcallbacks- The callbacks to executemaxTimeToBlock- The maximum time in milliseconds to block for buffer memory to be availableabortOnNewBatch- A boolean that indicates returning before a new batch is created and running the partitioner's onNewBatch method before trying to append againnowMs- The current time, in millisecondscluster- The cluster metadata- Throws:
InterruptedException
-
resetNextBatchExpiryTime
public void resetNextBatchExpiryTime() -
maybeUpdateNextBatchExpiryTime
-
expiredBatches
Get a list of batches which have been sitting in the accumulator too long and need to be expired. -
getDeliveryTimeoutMs
public long getDeliveryTimeoutMs() -
reenqueue
Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here. -
splitAndReenqueue
Split the big batch that has been rejected and reenqueue the split batches in to the accumulator.- Returns:
- the number of split batches.
-
ready
public RecordAccumulator.ReadyCheckResult ready(org.apache.kafka.clients.Metadata metadata, long nowMs) Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated partition batches.A destination node is ready to send data if:
- There is at least one partition that is not backing off its send
- and those partitions are not muted (to prevent reordering if "max.in.flight.requests.per.connection" is set to one)
- and any of the following are true
- The record set is full
- The record set has sat in the accumulator for at least lingerMs milliseconds
- The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready).
- The accumulator has been closed
-
hasUndrained
public boolean hasUndrained()Check whether there are any batches which haven't been drained -
drain
public Map<Integer,List<ProducerBatch>> drain(org.apache.kafka.clients.Metadata metadata, Set<org.apache.kafka.common.Node> nodes, int maxSize, long now) Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.- Parameters:
metadata- The current cluster metadatanodes- The list of node to drainmaxSize- The maximum number of bytes to drainnow- The current unix time in milliseconds- Returns:
- A list of
ProducerBatchfor each node specified with total size less than the requested maxSize.
-
updateNodeLatencyStats
-
getNodeLatencyStats
-
getBuiltInPartitioner
-
nextExpiryTimeMs
public long nextExpiryTimeMs()The earliest absolute time a batch will expire (in milliseconds) -
getDeque
-
deallocate
Deallocate the record batch -
beginFlush
public void beginFlush()Initiate the flushing of data from the accumulator...this makes all requests immediately ready -
awaitFlushCompletion
Mark all partitions as ready to send and block until the send is complete- Throws:
InterruptedException
-
hasIncomplete
public boolean hasIncomplete()Check whether there are any pending batches (whether sent or unsent). -
abortIncompleteBatches
public void abortIncompleteBatches()This function is only called when sender is closed forcefully. It will fail all the incomplete batches and return. -
mutePartition
public void mutePartition(org.apache.kafka.common.TopicPartition tp) -
unmutePartition
public void unmutePartition(org.apache.kafka.common.TopicPartition tp) -
close
public void close()Close this accumulator and force all the record buffers to be drained
-