Class OffsetsRequestManager

java.lang.Object
org.apache.kafka.clients.consumer.internals.OffsetsRequestManager
All Implemented Interfaces:
RequestManager, org.apache.kafka.common.ClusterResourceListener

public class OffsetsRequestManager extends Object implements RequestManager, org.apache.kafka.common.ClusterResourceListener
Manager responsible for building the following requests to retrieve partition offsets, and processing its responses.
  • ListOffset request
  • OffsetForLeaderEpoch request
Requests are kept in-memory ready to be sent on the next call to poll(long).
Partition leadership information required to build ListOffset requests is retrieved from the ConsumerMetadata, so this implements ClusterResourceListener to get notified when the cluster metadata is updated.
  • Constructor Details

    • OffsetsRequestManager

      public OffsetsRequestManager(SubscriptionState subscriptionState, ConsumerMetadata metadata, org.apache.kafka.common.IsolationLevel isolationLevel, org.apache.kafka.common.utils.Time time, long retryBackoffMs, long requestTimeoutMs, org.apache.kafka.clients.ApiVersions apiVersions, NetworkClientDelegate networkClientDelegate, BackgroundEventHandler backgroundEventHandler, org.apache.kafka.common.utils.LogContext logContext)
  • Method Details

    • poll

      public NetworkClientDelegate.PollResult poll(long currentTimeMs)
      Determine if there are pending fetch offsets requests to be sent and build a NetworkClientDelegate.PollResult containing it.
      Specified by:
      poll in interface RequestManager
      Parameters:
      currentTimeMs - The current system time at which the method was called; useful for determining if time-sensitive operations should be performed
    • fetchOffsets

      public CompletableFuture<Map<org.apache.kafka.common.TopicPartition,OffsetAndTimestamp>> fetchOffsets(Map<org.apache.kafka.common.TopicPartition,Long> timestampsToSearch, boolean requireTimestamps)
      Retrieve offsets for the given partitions and timestamp. For each partition, this will retrieve the offset of the first message whose timestamp is greater than or equals to the target timestamp.
      Parameters:
      timestampsToSearch - Partitions and target timestamps to get offsets for
      requireTimestamps - True if this should fail with an UnsupportedVersionException if the broker does not support fetching precise timestamps for offsets
      Returns:
      Future containing the map of TopicPartition and OffsetAndTimestamp found .The future will complete when the requests responses are received and processed, following a call to poll(long)
    • resetPositionsIfNeeded

      public CompletableFuture<Void> resetPositionsIfNeeded()
      Reset offsets for all assigned partitions that require it. Offsets will be reset with timestamps according to the reset strategy defined for each partition. This will generate ListOffsets requests for the partitions and timestamps, and enqueue them to be sent on the next call to poll(long).

      When a response is received, positions are updated in-memory, on the subscription state. If an error is received in the response, it will be saved to be thrown on the next call to this function (ex. TopicAuthorizationException)

    • validatePositionsIfNeeded

      public CompletableFuture<Void> validatePositionsIfNeeded()
      Validate positions for all assigned partitions for which a leader change has been detected. This will generate OffsetsForLeaderEpoch requests for the partitions, with the known offset epoch and current leader epoch. It will enqueue the generated requests, to be sent on the next call to poll(long).

      When a response is received, positions are validated and, if a log truncation is detected, a LogTruncationException will be saved in memory, to be thrown on the next call to this function.

    • onUpdate

      public void onUpdate(org.apache.kafka.common.ClusterResource clusterResource)
      Specified by:
      onUpdate in interface org.apache.kafka.common.ClusterResourceListener