Class SubscriptionState

java.lang.Object
org.apache.kafka.clients.consumer.internals.SubscriptionState

public class SubscriptionState extends Object
A class for tracking the topics, partitions, and offsets for the consumer. A partition is "assigned" either directly with assignFromUser(Set) (manual assignment) or with assignFromSubscribed(Collection) (automatic assignment from subscription).

Once assigned, the partition is not considered "fetchable" until its initial position has been set with seekValidated(TopicPartition, FetchPosition). Fetchable partitions track a position which is the last offset that has been returned to the user. You can suspend fetching from a partition through pause(TopicPartition) without affecting the consumed position. The partition will remain unfetchable until the resume(TopicPartition) is used. You can also query the pause state independently with isPaused(TopicPartition).

Note that pause state as well as the consumed positions are not preserved when partition assignment is changed whether directly by the user or through a group rebalance.

Thread Safety: this class is thread-safe.

  • Constructor Details

    • SubscriptionState

      public SubscriptionState(org.apache.kafka.common.utils.LogContext logContext, OffsetResetStrategy defaultResetStrategy)
  • Method Details

    • toString

      public String toString()
      Overrides:
      toString in class Object
    • prettyString

      public String prettyString()
    • subscribe

      public boolean subscribe(Set<String> topics, Optional<ConsumerRebalanceListener> listener)
    • subscribe

      public void subscribe(Pattern pattern, Optional<ConsumerRebalanceListener> listener)
    • subscribeFromPattern

      public boolean subscribeFromPattern(Set<String> topics)
    • assignFromUser

      public boolean assignFromUser(Set<org.apache.kafka.common.TopicPartition> partitions)
      Change the assignment to the specified partitions provided by the user, note this is different from assignFromSubscribed(Collection) whose input partitions are provided from the subscribed topics.
    • checkAssignmentMatchedSubscription

      public boolean checkAssignmentMatchedSubscription(Collection<org.apache.kafka.common.TopicPartition> assignments)
      Returns:
      true if assignments matches subscription, otherwise false
    • assignFromSubscribed

      public void assignFromSubscribed(Collection<org.apache.kafka.common.TopicPartition> assignments)
      Change the assignment to the specified partitions returned from the coordinator, note this is different from assignFromUser(Set) which directly set the assignment from user inputs.
    • hasNoSubscriptionOrUserAssignment

      public boolean hasNoSubscriptionOrUserAssignment()
    • unsubscribe

      public void unsubscribe()
    • subscription

      public Set<String> subscription()
    • pausedPartitions

      public Set<org.apache.kafka.common.TopicPartition> pausedPartitions()
    • seekValidated

      public void seekValidated(org.apache.kafka.common.TopicPartition tp, SubscriptionState.FetchPosition position)
    • seek

      public void seek(org.apache.kafka.common.TopicPartition tp, long offset)
    • seekUnvalidated

      public void seekUnvalidated(org.apache.kafka.common.TopicPartition tp, SubscriptionState.FetchPosition position)
    • assignedPartitions

      public Set<org.apache.kafka.common.TopicPartition> assignedPartitions()
      Returns:
      a modifiable copy of the currently assigned partitions
    • assignedPartitionsList

      public List<org.apache.kafka.common.TopicPartition> assignedPartitionsList()
      Returns:
      a modifiable copy of the currently assigned partitions as a list
    • fetchablePartitions

      public List<org.apache.kafka.common.TopicPartition> fetchablePartitions(Predicate<org.apache.kafka.common.TopicPartition> isAvailable)
    • hasAutoAssignedPartitions

      public boolean hasAutoAssignedPartitions()
    • position

      public void position(org.apache.kafka.common.TopicPartition tp, SubscriptionState.FetchPosition position)
    • maybeValidatePositionForCurrentLeader

      public boolean maybeValidatePositionForCurrentLeader(org.apache.kafka.clients.ApiVersions apiVersions, org.apache.kafka.common.TopicPartition tp, org.apache.kafka.clients.Metadata.LeaderAndEpoch leaderAndEpoch)
      Enter the offset validation state if the leader for this partition is known to support a usable version of the OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation.
      Parameters:
      apiVersions - supported API versions
      tp - topic partition to validate
      leaderAndEpoch - leader epoch of the topic partition
      Returns:
      true if we enter the offset validation state
    • maybeCompleteValidation

      public Optional<SubscriptionState.LogTruncation> maybeCompleteValidation(org.apache.kafka.common.TopicPartition tp, SubscriptionState.FetchPosition requestPosition, org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset)
      Attempt to complete validation with the end offset returned from the OffsetForLeaderEpoch request.
      Returns:
      Log truncation details if detected and no reset policy is defined.
    • awaitingValidation

      public boolean awaitingValidation(org.apache.kafka.common.TopicPartition tp)
    • completeValidation

      public void completeValidation(org.apache.kafka.common.TopicPartition tp)
    • validPosition

      public SubscriptionState.FetchPosition validPosition(org.apache.kafka.common.TopicPartition tp)
    • position

      public SubscriptionState.FetchPosition position(org.apache.kafka.common.TopicPartition tp)
    • partitionLag

      public Long partitionLag(org.apache.kafka.common.TopicPartition tp, org.apache.kafka.common.IsolationLevel isolationLevel)
    • partitionEndOffset

      public Long partitionEndOffset(org.apache.kafka.common.TopicPartition tp, org.apache.kafka.common.IsolationLevel isolationLevel)
    • requestPartitionEndOffset

      public void requestPartitionEndOffset(org.apache.kafka.common.TopicPartition tp)
    • partitionEndOffsetRequested

      public boolean partitionEndOffsetRequested(org.apache.kafka.common.TopicPartition tp)
    • updatePreferredReadReplica

      public void updatePreferredReadReplica(org.apache.kafka.common.TopicPartition tp, int preferredReadReplicaId, LongSupplier timeMs)
      Set the preferred read replica with a lease timeout. After this time, the replica will no longer be valid and preferredReadReplica(TopicPartition, long) will return an empty result.
      Parameters:
      tp - The topic partition
      preferredReadReplicaId - The preferred read replica
      timeMs - The time at which this preferred replica is no longer valid
    • preferredReadReplica

      public Optional<Integer> preferredReadReplica(org.apache.kafka.common.TopicPartition tp, long timeMs)
      Get the preferred read replica
      Parameters:
      tp - The topic partition
      timeMs - The current time
      Returns:
      Returns the current preferred read replica, if it has been set and if it has not expired.
    • clearPreferredReadReplica

      public Optional<Integer> clearPreferredReadReplica(org.apache.kafka.common.TopicPartition tp)
      Unset the preferred read replica. This causes the fetcher to go back to the leader for fetches.
      Parameters:
      tp - The topic partition
      Returns:
      the removed preferred read replica if set, None otherwise.
    • allConsumed

      public Map<org.apache.kafka.common.TopicPartition,OffsetAndMetadata> allConsumed()
    • requestOffsetReset

      public void requestOffsetReset(org.apache.kafka.common.TopicPartition partition, OffsetResetStrategy offsetResetStrategy)
    • requestOffsetReset

      public void requestOffsetReset(Collection<org.apache.kafka.common.TopicPartition> partitions, OffsetResetStrategy offsetResetStrategy)
    • requestOffsetReset

      public void requestOffsetReset(org.apache.kafka.common.TopicPartition partition)
    • isOffsetResetNeeded

      public boolean isOffsetResetNeeded(org.apache.kafka.common.TopicPartition partition)
    • resetStrategy

      public OffsetResetStrategy resetStrategy(org.apache.kafka.common.TopicPartition partition)
    • hasAllFetchPositions

      public boolean hasAllFetchPositions()
    • initializingPartitions

      public Set<org.apache.kafka.common.TopicPartition> initializingPartitions()
    • resetInitializingPositions

      public void resetInitializingPositions()
    • partitionsNeedingReset

      public Set<org.apache.kafka.common.TopicPartition> partitionsNeedingReset(long nowMs)
    • partitionsNeedingValidation

      public Set<org.apache.kafka.common.TopicPartition> partitionsNeedingValidation(long nowMs)
    • isAssigned

      public boolean isAssigned(org.apache.kafka.common.TopicPartition tp)
    • isPaused

      public boolean isPaused(org.apache.kafka.common.TopicPartition tp)
    • hasValidPosition

      public boolean hasValidPosition(org.apache.kafka.common.TopicPartition tp)
    • pause

      public void pause(org.apache.kafka.common.TopicPartition tp)
    • markPendingRevocation

      public void markPendingRevocation(Set<org.apache.kafka.common.TopicPartition> tps)
    • resume

      public void resume(org.apache.kafka.common.TopicPartition tp)
    • rebalanceListener

      public Optional<ConsumerRebalanceListener> rebalanceListener()