Class AbstractCoordinator

java.lang.Object
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
All Implemented Interfaces:
Closeable, AutoCloseable
Direct Known Subclasses:
ConsumerCoordinator

public abstract class AbstractCoordinator extends Object implements Closeable
AbstractCoordinator implements group management for a single group member by interacting with a designated Kafka broker (the coordinator). Group semantics are provided by extending this class. See ConsumerCoordinator for example usage. From a high level, Kafka's group management protocol consists of the following sequence of actions:
  1. Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
  2. Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
  3. State Assignment: The leader collects the metadata from all the members of the group and assigns state.
  4. Group Stabilization: Each member receives the state assigned by the leader and begins processing.
To leverage this protocol, an implementation must define the format of metadata provided by each member for group registration in metadata() and the format of the state assignment provided by the leader in onLeaderElected(String, String, List, boolean) and becomes available to members in onJoinComplete(int, String, String, ByteBuffer). Note on locking: this class shares state between the caller and a background thread which is used for sending heartbeats after the client has joined the group. All mutable state as well as state transitions are protected with the class's monitor. Generally this means acquiring the lock before reading or writing the state of the group (e.g. generation, memberId) and holding the lock when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).
  • Field Details

  • Constructor Details

    • AbstractCoordinator

      public AbstractCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time)
      Initialize the coordination manager.
    • AbstractCoordinator

      public AbstractCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter)
  • Method Details

    • protocolType

      protected abstract String protocolType()
      Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").
      Returns:
      Non-null protocol type name
    • metadata

      protected abstract org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata()
      Get the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).
      Returns:
      Non-empty map of supported protocols and metadata
    • onJoinPrepare

      protected abstract boolean onJoinPrepare(org.apache.kafka.common.utils.Timer timer, int generation, String memberId)
      Invoked prior to each group join or rejoin. This is typically used to perform any cleanup from the previous generation (such as committing offsets for the consumer)
      Parameters:
      timer - Timer bounding how long this method can block
      generation - The previous generation or -1 if there was none
      memberId - The identifier of this member in the previous group or "" if there was none
      Returns:
      true If onJoinPrepare async commit succeeded, false otherwise
    • onLeaderElected

      protected abstract Map<String,ByteBuffer> onLeaderElected(String leaderId, String protocol, List<org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, boolean skipAssignment)
      Invoked when the leader is elected. This is used by the leader to perform the assignment if necessary and to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)
      Parameters:
      leaderId - The id of the leader (which is this member)
      protocol - The protocol selected by the coordinator
      allMemberMetadata - Metadata from all members of the group
      skipAssignment - True if leader must skip running the assignor
      Returns:
      A map from each member to their state assignment
    • onJoinComplete

      protected abstract void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment)
      Invoked when a group member has successfully joined a group. If this call fails with an exception, then it will be retried using the same assignment state on the next call to ensureActiveGroup().
      Parameters:
      generation - The generation that was joined
      memberId - The identifier for the local member in the group
      protocol - The protocol selected by the coordinator
      memberAssignment - The assignment propagated from the group leader
    • onLeavePrepare

      protected void onLeavePrepare()
      Invoked prior to each leave group event. This is typically used to cleanup assigned partitions; note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would not trigger it even if it tries to force leaving group upon heartbeat session expiration)
    • ensureCoordinatorReady

      protected boolean ensureCoordinatorReady(org.apache.kafka.common.utils.Timer timer)
      Ensure that the coordinator is ready to receive requests.
      Parameters:
      timer - Timer bounding how long this method can block
      Returns:
      true If coordinator discovery and initial connection succeeded, false otherwise
    • ensureCoordinatorReadyAsync

      protected boolean ensureCoordinatorReadyAsync()
      Ensure that the coordinator is ready to receive requests. This will return immediately without blocking. It is intended to be called in an asynchronous context when wakeups are not expected.
      Returns:
      true If coordinator discovery and initial connection succeeded, false otherwise
    • lookupCoordinator

      protected RequestFuture<Void> lookupCoordinator()
    • rejoinNeededOrPending

      protected boolean rejoinNeededOrPending()
      Check whether the group should be rejoined (e.g. if metadata changes) or whether a rejoin request is already in flight and needs to be completed.
      Returns:
      true if it should, false otherwise
    • pollHeartbeat

      protected void pollHeartbeat(long now)
      Check the status of the heartbeat thread (if it is active) and indicate the liveness of the client. This must be called periodically after joining with ensureActiveGroup() to ensure that the member stays in the group. If an interval of time longer than the provided rebalance timeout expires without calling this method, then the client will proactively leave the group.
      Parameters:
      now - current time in milliseconds
      Throws:
      RuntimeException - for unexpected errors raised from the heartbeat thread
    • timeToNextHeartbeat

      protected long timeToNextHeartbeat(long now)
    • ensureActiveGroup

      public void ensureActiveGroup()
      Ensure that the group is active (i.e. joined and synced)
    • coordinatorUnknown

      public boolean coordinatorUnknown()
      Check if we know who the coordinator is and we have an active connection
      Returns:
      true if the coordinator is unknown
    • checkAndGetCoordinator

      protected org.apache.kafka.common.Node checkAndGetCoordinator()
      Get the coordinator if its connection is still active. Otherwise mark it unknown and return null.
      Returns:
      the current coordinator or null if it is unknown
    • markCoordinatorUnknown

      protected void markCoordinatorUnknown(org.apache.kafka.common.protocol.Errors error)
    • markCoordinatorUnknown

      protected void markCoordinatorUnknown(String cause)
    • markCoordinatorUnknown

      protected void markCoordinatorUnknown(boolean isDisconnected, String cause)
    • generation

      protected AbstractCoordinator.Generation generation()
      Get the current generation state, regardless of whether it is currently stable. Note that the generation information can be updated while we are still in the middle of a rebalance, after the join-group response is received.
      Returns:
      the current generation
    • generationIfStable

      protected AbstractCoordinator.Generation generationIfStable()
      Get the current generation state if the group is stable, otherwise return null
      Returns:
      the current generation or null
    • rebalanceInProgress

      protected boolean rebalanceInProgress()
    • memberId

      protected String memberId()
    • requestRejoinIfNecessary

      public void requestRejoinIfNecessary(String shortReason, String fullReason)
    • requestRejoin

      public void requestRejoin(String shortReason)
    • requestRejoin

      public void requestRejoin(String shortReason, String fullReason)
      Request to rejoin the group.
      Parameters:
      shortReason - This is the reason passed up to the group coordinator. It must be reasonably small.
      fullReason - This is the reason logged locally.
    • close

      public final void close()
      Close the coordinator, waiting if needed to send LeaveGroup.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • close

      protected void close(org.apache.kafka.common.utils.Timer timer)
      Throws:
      org.apache.kafka.common.KafkaException - if the rebalance callback throws exception
    • maybeLeaveGroup

      public RequestFuture<Void> maybeLeaveGroup(String leaveReason)
      Sends LeaveGroupRequest and logs the leaveReason, unless this member is using static membership or is already not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown).
      Parameters:
      leaveReason - the reason to leave the group for logging
      Throws:
      org.apache.kafka.common.KafkaException - if the rebalance callback throws exception
    • isDynamicMember

      protected boolean isDynamicMember()
    • createMeter

      protected final org.apache.kafka.common.metrics.stats.Meter createMeter(org.apache.kafka.common.metrics.Metrics metrics, String groupName, String baseName, String descriptiveName)