Class ConsumerCoordinator

java.lang.Object
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
All Implemented Interfaces:
Closeable, AutoCloseable

public final class ConsumerCoordinator extends AbstractCoordinator
This class manages the coordination process with the consumer coordinator.
  • Constructor Details

    • ConsumerCoordinator

      public ConsumerCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, List<ConsumerPartitionAssignor> assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time, boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?,?> interceptors, boolean throwOnFetchStableOffsetsUnsupported, String rackId, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter)
      Initialize the coordination manager.
  • Method Details

    • protocolType

      public String protocolType()
      Description copied from class: AbstractCoordinator
      Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
      Specified by:
      protocolType in class AbstractCoordinator
      Returns:
      Non-null protocol type name
    • metadata

      protected org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata()
      Description copied from class: AbstractCoordinator
      Get the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).
      Specified by:
      metadata in class AbstractCoordinator
      Returns:
      Non-empty map of supported protocols and metadata
    • updatePatternSubscription

      public void updatePatternSubscription(org.apache.kafka.common.Cluster cluster)
    • onJoinComplete

      protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer)
      Description copied from class: AbstractCoordinator
      Invoked when a group member has successfully joined a group. If this call fails with an exception, then it will be retried using the same assignment state on the next call to AbstractCoordinator.ensureActiveGroup().
      Specified by:
      onJoinComplete in class AbstractCoordinator
      Parameters:
      generation - The generation that was joined
      memberId - The identifier for the local member in the group
      assignmentStrategy - The protocol selected by the coordinator
      assignmentBuffer - The assignment propagated from the group leader
    • poll

      public boolean poll(org.apache.kafka.common.utils.Timer timer, boolean waitForJoinGroup)
      Poll for coordinator events. This ensures that the coordinator is known and that the consumer has joined the group (if it is using group management). This also handles periodic offset commits if they are enabled.

      Returns early if the timeout expires or if waiting on rejoin is not required

      Parameters:
      timer - Timer bounding how long this method can block
      waitForJoinGroup - Boolean flag indicating if we should wait until re-join group completes
      Returns:
      true iff the operation succeeded
      Throws:
      org.apache.kafka.common.KafkaException - if the rebalance callback throws an exception
    • timeToNextPoll

      public long timeToNextPoll(long now)
      Return the time to the next needed invocation of ConsumerNetworkClient.poll(Timer).
      Parameters:
      now - current time in milliseconds
      Returns:
      the maximum time in milliseconds the caller should wait before the next invocation of poll()
    • onLeaderElected

      protected Map<String,ByteBuffer> onLeaderElected(String leaderId, String assignmentStrategy, List<org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions, boolean skipAssignment)
      Description copied from class: AbstractCoordinator
      Invoked when the leader is elected. This is used by the leader to perform the assignment if necessary and to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)
      Specified by:
      onLeaderElected in class AbstractCoordinator
      Parameters:
      leaderId - The id of the leader (which is this member)
      assignmentStrategy - The protocol selected by the coordinator
      allSubscriptions - Metadata from all members of the group
      skipAssignment - True if leader must skip running the assignor
      Returns:
      A map from each member to their state assignment
    • onJoinPrepare

      protected boolean onJoinPrepare(org.apache.kafka.common.utils.Timer timer, int generation, String memberId)
      Description copied from class: AbstractCoordinator
      Invoked prior to each group join or rejoin. This is typically used to perform any cleanup from the previous generation (such as committing offsets for the consumer)
      Specified by:
      onJoinPrepare in class AbstractCoordinator
      Parameters:
      timer - Timer bounding how long this method can block
      generation - The previous generation or -1 if there was none
      memberId - The identifier of this member in the previous group or "" if there was none
      Returns:
      true If onJoinPrepare async commit succeeded, false otherwise
    • onLeavePrepare

      public void onLeavePrepare()
      Description copied from class: AbstractCoordinator
      Invoked prior to each leave group event. This is typically used to cleanup assigned partitions; note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would not trigger it even if it tries to force leaving group upon heartbeat session expiration)
      Overrides:
      onLeavePrepare in class AbstractCoordinator
    • rejoinNeededOrPending

      public boolean rejoinNeededOrPending()
      Description copied from class: AbstractCoordinator
      Check whether the group should be rejoined (e.g. if metadata changes) or whether a rejoin request is already in flight and needs to be completed.
      Overrides:
      rejoinNeededOrPending in class AbstractCoordinator
      Returns:
      true if it should, false otherwise
      Throws:
      org.apache.kafka.common.KafkaException - if the callback throws exception
    • initWithCommittedOffsetsIfNeeded

      public boolean initWithCommittedOffsetsIfNeeded(org.apache.kafka.common.utils.Timer timer)
      Refresh the committed offsets for partitions that require initialization.
      Parameters:
      timer - Timer bounding how long this method can block
      Returns:
      true iff the operation completed within the timeout
    • fetchCommittedOffsets

      public Map<org.apache.kafka.common.TopicPartition,OffsetAndMetadata> fetchCommittedOffsets(Set<org.apache.kafka.common.TopicPartition> partitions, org.apache.kafka.common.utils.Timer timer)
      Fetch the current committed offsets from the coordinator for a set of partitions.
      Parameters:
      partitions - The partitions to fetch offsets for
      Returns:
      A map from partition to the committed offset or null if the operation timed out
    • groupMetadata

      public ConsumerGroupMetadata groupMetadata()
      Return the consumer group metadata.
      Returns:
      the current consumer group metadata
    • close

      public void close(org.apache.kafka.common.utils.Timer timer)
      Overrides:
      close in class AbstractCoordinator
      Throws:
      org.apache.kafka.common.KafkaException - if the rebalance callback throws exception
    • commitOffsetsAsync

      public RequestFuture<Void> commitOffsetsAsync(Map<org.apache.kafka.common.TopicPartition,OffsetAndMetadata> offsets, OffsetCommitCallback callback)
    • commitOffsetsSync

      public boolean commitOffsetsSync(Map<org.apache.kafka.common.TopicPartition,OffsetAndMetadata> offsets, org.apache.kafka.common.utils.Timer timer)
      Commit offsets synchronously. This method will retry until the commit completes successfully or an unrecoverable error is encountered.
      Parameters:
      offsets - The offsets to be committed
      Returns:
      If the offset commit was successfully sent and a successful response was received from the coordinator
      Throws:
      org.apache.kafka.common.errors.AuthorizationException - if the consumer is not authorized to the group or to any of the specified partitions. See the exception for more details
      CommitFailedException - if an unrecoverable error occurs before the commit can be completed
      org.apache.kafka.common.errors.FencedInstanceIdException - if a static member gets fenced
    • maybeAutoCommitOffsetsAsync

      public void maybeAutoCommitOffsetsAsync(long now)