Class RecordAccumulator

java.lang.Object
org.apache.kafka.clients.producer.internals.RecordAccumulator

public class RecordAccumulator extends Object
This class acts as a queue that accumulates records into MemoryRecords instances to be sent to the server.

The accumulator uses a bounded amount of memory and append calls will block when that memory is exhausted, unless this behavior is explicitly disabled.

  • Constructor Details

    • RecordAccumulator

      public RecordAccumulator(org.apache.kafka.common.utils.LogContext logContext, int batchSize, org.apache.kafka.common.record.CompressionType compression, int lingerMs, long retryBackoffMs, long retryBackoffMaxMs, int deliveryTimeoutMs, RecordAccumulator.PartitionerConfig partitionerConfig, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpName, org.apache.kafka.common.utils.Time time, org.apache.kafka.clients.ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool)
      Create a new record accumulator
      Parameters:
      logContext - The log context used for logging
      batchSize - The size to use when allocating MemoryRecords instances
      compression - The compression codec for the records
      lingerMs - An artificial delay time to add before declaring a records instance that isn't full ready for sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests).
      retryBackoffMs - An artificial delay time to retry the produce request upon receiving an error. This avoids exhausting all retries in a short period of time.
      retryBackoffMaxMs - The upper bound of the retry backoff time.
      deliveryTimeoutMs - An upper bound on the time to report success or failure on record delivery
      partitionerConfig - Partitioner config
      metrics - The metrics
      metricGrpName - The metric group name
      time - The time instance to use
      apiVersions - Request API versions for current connected brokers
      transactionManager - The shared transaction state object which tracks producer IDs, epochs, and sequence numbers per partition.
      bufferPool - The buffer pool
    • RecordAccumulator

      public RecordAccumulator(org.apache.kafka.common.utils.LogContext logContext, int batchSize, org.apache.kafka.common.record.CompressionType compression, int lingerMs, long retryBackoffMs, long retryBackoffMaxMs, int deliveryTimeoutMs, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpName, org.apache.kafka.common.utils.Time time, org.apache.kafka.clients.ApiVersions apiVersions, TransactionManager transactionManager, BufferPool bufferPool)
      Create a new record accumulator with default partitioner config
      Parameters:
      logContext - The log context used for logging
      batchSize - The size to use when allocating MemoryRecords instances
      compression - The compression codec for the records
      lingerMs - An artificial delay time to add before declaring a records instance that isn't full ready for sending. This allows time for more records to arrive. Setting a non-zero lingerMs will trade off some latency for potentially better throughput due to more batching (and hence fewer, larger requests).
      retryBackoffMs - An artificial delay time to retry the produce request upon receiving an error. This avoids exhausting all retries in a short period of time.
      retryBackoffMaxMs - The upper bound of the retry backoff time.
      deliveryTimeoutMs - An upper bound on the time to report success or failure on record delivery
      metrics - The metrics
      metricGrpName - The metric group name
      time - The time instance to use
      apiVersions - Request API versions for current connected brokers
      transactionManager - The shared transaction state object which tracks producer IDs, epochs, and sequence numbers per partition.
      bufferPool - The buffer pool
  • Method Details

    • append

      public RecordAccumulator.RecordAppendResult append(String topic, int partition, long timestamp, byte[] key, byte[] value, org.apache.kafka.common.header.Header[] headers, RecordAccumulator.AppendCallbacks callbacks, long maxTimeToBlock, boolean abortOnNewBatch, long nowMs, org.apache.kafka.common.Cluster cluster) throws InterruptedException
      Add a record to the accumulator, return the append result

      The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created

      Parameters:
      topic - The topic to which this record is being sent
      partition - The partition to which this record is being sent or RecordMetadata.UNKNOWN_PARTITION if any partition could be used
      timestamp - The timestamp of the record
      key - The key for the record
      value - The value for the record
      headers - the Headers for the record
      callbacks - The callbacks to execute
      maxTimeToBlock - The maximum time in milliseconds to block for buffer memory to be available
      abortOnNewBatch - A boolean that indicates returning before a new batch is created and running the partitioner's onNewBatch method before trying to append again
      nowMs - The current time, in milliseconds
      cluster - The cluster metadata
      Throws:
      InterruptedException
    • resetNextBatchExpiryTime

      public void resetNextBatchExpiryTime()
    • maybeUpdateNextBatchExpiryTime

      public void maybeUpdateNextBatchExpiryTime(ProducerBatch batch)
    • expiredBatches

      public List<ProducerBatch> expiredBatches(long now)
      Get a list of batches which have been sitting in the accumulator too long and need to be expired.
    • getDeliveryTimeoutMs

      public long getDeliveryTimeoutMs()
    • reenqueue

      public void reenqueue(ProducerBatch batch, long now)
      Re-enqueue the given record batch in the accumulator. In Sender.completeBatch method, we check whether the batch has reached deliveryTimeoutMs or not. Hence we do not do the delivery timeout check here.
    • splitAndReenqueue

      public int splitAndReenqueue(ProducerBatch bigBatch)
      Split the big batch that has been rejected and reenqueue the split batches in to the accumulator.
      Returns:
      the number of split batches.
    • ready

      public RecordAccumulator.ReadyCheckResult ready(org.apache.kafka.clients.Metadata metadata, long nowMs)
      Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated partition batches.

      A destination node is ready to send data if:

      1. There is at least one partition that is not backing off its send
      2. and those partitions are not muted (to prevent reordering if "max.in.flight.requests.per.connection" is set to one)
      3. and any of the following are true
        • The record set is full
        • The record set has sat in the accumulator for at least lingerMs milliseconds
        • The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions are immediately considered ready).
        • The accumulator has been closed
    • hasUndrained

      public boolean hasUndrained()
      Check whether there are any batches which haven't been drained
    • drain

      public Map<Integer,List<ProducerBatch>> drain(org.apache.kafka.clients.Metadata metadata, Set<org.apache.kafka.common.Node> nodes, int maxSize, long now)
      Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
      Parameters:
      metadata - The current cluster metadata
      nodes - The list of node to drain
      maxSize - The maximum number of bytes to drain
      now - The current unix time in milliseconds
      Returns:
      A list of ProducerBatch for each node specified with total size less than the requested maxSize.
    • updateNodeLatencyStats

      public void updateNodeLatencyStats(Integer nodeId, long nowMs, boolean canDrain)
    • getNodeLatencyStats

      public RecordAccumulator.NodeLatencyStats getNodeLatencyStats(Integer nodeId)
    • getBuiltInPartitioner

      public BuiltInPartitioner getBuiltInPartitioner(String topic)
    • nextExpiryTimeMs

      public long nextExpiryTimeMs()
      The earliest absolute time a batch will expire (in milliseconds)
    • getDeque

      public Deque<ProducerBatch> getDeque(org.apache.kafka.common.TopicPartition tp)
    • deallocate

      public void deallocate(ProducerBatch batch)
      Deallocate the record batch
    • beginFlush

      public void beginFlush()
      Initiate the flushing of data from the accumulator...this makes all requests immediately ready
    • awaitFlushCompletion

      public void awaitFlushCompletion() throws InterruptedException
      Mark all partitions as ready to send and block until the send is complete
      Throws:
      InterruptedException
    • hasIncomplete

      public boolean hasIncomplete()
      Check whether there are any pending batches (whether sent or unsent).
    • abortIncompleteBatches

      public void abortIncompleteBatches()
      This function is only called when sender is closed forcefully. It will fail all the incomplete batches and return.
    • mutePartition

      public void mutePartition(org.apache.kafka.common.TopicPartition tp)
    • unmutePartition

      public void unmutePartition(org.apache.kafka.common.TopicPartition tp)
    • close

      public void close()
      Close this accumulator and force all the record buffers to be drained