Class AbstractCoordinator
java.lang.Object
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- All Implemented Interfaces:
Closeable,AutoCloseable
- Direct Known Subclasses:
ConsumerCoordinator
AbstractCoordinator implements group management for a single group member by interacting with
a designated Kafka broker (the coordinator). Group semantics are provided by extending this class.
See
ConsumerCoordinator for example usage.
From a high level, Kafka's group management protocol consists of the following sequence of actions:
- Group Registration: Group members register with the coordinator providing their own metadata (such as the set of topics they are interested in).
- Group/Leader Selection: The coordinator select the members of the group and chooses one member as the leader.
- State Assignment: The leader collects the metadata from all the members of the group and assigns state.
- Group Stabilization: Each member receives the state assigned by the leader and begins processing.
metadata() and the format of the state assignment provided
by the leader in onLeaderElected(String, String, List, boolean) and becomes available to members in
onJoinComplete(int, String, String, ByteBuffer).
Note on locking: this class shares state between the caller and a background thread which is
used for sending heartbeats after the client has joined the group. All mutable state as well as
state transitions are protected with the class's monitor. Generally this means acquiring the lock
before reading or writing the state of the group (e.g. generation, memberId) and holding the lock
when sending a request that affects the state of the group (e.g. JoinGroup, LeaveGroup).-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected classprotected static classprotected static enum -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final ConsumerNetworkClientstatic final Stringstatic final intprotected final org.apache.kafka.common.utils.ExponentialBackoffprotected AbstractCoordinator.MemberStateprotected final org.apache.kafka.common.utils.Time -
Constructor Summary
ConstructorsConstructorDescriptionAbstractCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time) Initialize the coordination manager.AbstractCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter) -
Method Summary
Modifier and TypeMethodDescriptionprotected org.apache.kafka.common.NodeGet the coordinator if its connection is still active.final voidclose()Close the coordinator, waiting if needed to send LeaveGroup.protected voidclose(org.apache.kafka.common.utils.Timer timer) booleanCheck if we know who the coordinator is and we have an active connectionprotected final org.apache.kafka.common.metrics.stats.MetercreateMeter(org.apache.kafka.common.metrics.Metrics metrics, String groupName, String baseName, String descriptiveName) voidEnsure that the group is active (i.e.protected booleanensureCoordinatorReady(org.apache.kafka.common.utils.Timer timer) Ensure that the coordinator is ready to receive requests.protected booleanEnsure that the coordinator is ready to receive requests.protected AbstractCoordinator.GenerationGet the current generation state, regardless of whether it is currently stable.protected AbstractCoordinator.GenerationGet the current generation state if the group is stable, otherwise return nullprotected booleanprotected RequestFuture<Void> protected voidmarkCoordinatorUnknown(boolean isDisconnected, String cause) protected voidmarkCoordinatorUnknown(String cause) protected voidmarkCoordinatorUnknown(org.apache.kafka.common.protocol.Errors error) maybeLeaveGroup(String leaveReason) Sends LeaveGroupRequest and logs theleaveReason, unless this member is using static membership or is already not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown).protected StringmemberId()protected abstract org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollectionmetadata()Get the current list of protocols and their associated metadata supported by the local member.protected abstract voidonJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) Invoked when a group member has successfully joined a group.protected abstract booleanonJoinPrepare(org.apache.kafka.common.utils.Timer timer, int generation, String memberId) Invoked prior to each group join or rejoin.protected abstract Map<String, ByteBuffer> onLeaderElected(String leaderId, String protocol, List<org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, boolean skipAssignment) Invoked when the leader is elected.protected voidInvoked prior to each leave group event.protected voidpollHeartbeat(long now) Check the status of the heartbeat thread (if it is active) and indicate the liveness of the client.protected abstract StringUnique identifier for the class of supported protocols (e.g.protected booleanprotected booleanCheck whether the group should be rejoined (e.g.voidrequestRejoin(String shortReason) voidrequestRejoin(String shortReason, String fullReason) Request to rejoin the group.voidrequestRejoinIfNecessary(String shortReason, String fullReason) protected longtimeToNextHeartbeat(long now)
-
Field Details
-
HEARTBEAT_THREAD_PREFIX
- See Also:
-
JOIN_GROUP_TIMEOUT_LAPSE
public static final int JOIN_GROUP_TIMEOUT_LAPSE- See Also:
-
time
protected final org.apache.kafka.common.utils.Time time -
client
-
retryBackoff
protected final org.apache.kafka.common.utils.ExponentialBackoff retryBackoff -
state
-
-
Constructor Details
-
AbstractCoordinator
public AbstractCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time) Initialize the coordination manager. -
AbstractCoordinator
public AbstractCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter)
-
-
Method Details
-
protocolType
Unique identifier for the class of supported protocols (e.g. "consumer" or "connect").- Returns:
- Non-null protocol type name
-
metadata
protected abstract org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata()Get the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).- Returns:
- Non-empty map of supported protocols and metadata
-
onJoinPrepare
protected abstract boolean onJoinPrepare(org.apache.kafka.common.utils.Timer timer, int generation, String memberId) Invoked prior to each group join or rejoin. This is typically used to perform any cleanup from the previous generation (such as committing offsets for the consumer)- Parameters:
timer- Timer bounding how long this method can blockgeneration- The previous generation or -1 if there was nonememberId- The identifier of this member in the previous group or "" if there was none- Returns:
- true If onJoinPrepare async commit succeeded, false otherwise
-
onLeaderElected
protected abstract Map<String,ByteBuffer> onLeaderElected(String leaderId, String protocol, List<org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata, boolean skipAssignment) Invoked when the leader is elected. This is used by the leader to perform the assignment if necessary and to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)- Parameters:
leaderId- The id of the leader (which is this member)protocol- The protocol selected by the coordinatorallMemberMetadata- Metadata from all members of the groupskipAssignment- True if leader must skip running the assignor- Returns:
- A map from each member to their state assignment
-
onJoinComplete
protected abstract void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) Invoked when a group member has successfully joined a group. If this call fails with an exception, then it will be retried using the same assignment state on the next call toensureActiveGroup().- Parameters:
generation- The generation that was joinedmemberId- The identifier for the local member in the groupprotocol- The protocol selected by the coordinatormemberAssignment- The assignment propagated from the group leader
-
onLeavePrepare
protected void onLeavePrepare()Invoked prior to each leave group event. This is typically used to cleanup assigned partitions; note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would not trigger it even if it tries to force leaving group upon heartbeat session expiration) -
ensureCoordinatorReady
protected boolean ensureCoordinatorReady(org.apache.kafka.common.utils.Timer timer) Ensure that the coordinator is ready to receive requests.- Parameters:
timer- Timer bounding how long this method can block- Returns:
- true If coordinator discovery and initial connection succeeded, false otherwise
-
ensureCoordinatorReadyAsync
protected boolean ensureCoordinatorReadyAsync()Ensure that the coordinator is ready to receive requests. This will return immediately without blocking. It is intended to be called in an asynchronous context when wakeups are not expected.- Returns:
- true If coordinator discovery and initial connection succeeded, false otherwise
-
lookupCoordinator
-
rejoinNeededOrPending
protected boolean rejoinNeededOrPending()Check whether the group should be rejoined (e.g. if metadata changes) or whether a rejoin request is already in flight and needs to be completed.- Returns:
- true if it should, false otherwise
-
pollHeartbeat
protected void pollHeartbeat(long now) Check the status of the heartbeat thread (if it is active) and indicate the liveness of the client. This must be called periodically after joining withensureActiveGroup()to ensure that the member stays in the group. If an interval of time longer than the provided rebalance timeout expires without calling this method, then the client will proactively leave the group.- Parameters:
now- current time in milliseconds- Throws:
RuntimeException- for unexpected errors raised from the heartbeat thread
-
timeToNextHeartbeat
protected long timeToNextHeartbeat(long now) -
ensureActiveGroup
public void ensureActiveGroup()Ensure that the group is active (i.e. joined and synced) -
coordinatorUnknown
public boolean coordinatorUnknown()Check if we know who the coordinator is and we have an active connection- Returns:
- true if the coordinator is unknown
-
checkAndGetCoordinator
protected org.apache.kafka.common.Node checkAndGetCoordinator()Get the coordinator if its connection is still active. Otherwise mark it unknown and return null.- Returns:
- the current coordinator or null if it is unknown
-
markCoordinatorUnknown
protected void markCoordinatorUnknown(org.apache.kafka.common.protocol.Errors error) -
markCoordinatorUnknown
-
markCoordinatorUnknown
-
generation
Get the current generation state, regardless of whether it is currently stable. Note that the generation information can be updated while we are still in the middle of a rebalance, after the join-group response is received.- Returns:
- the current generation
-
generationIfStable
Get the current generation state if the group is stable, otherwise return null- Returns:
- the current generation or null
-
rebalanceInProgress
protected boolean rebalanceInProgress() -
memberId
-
requestRejoinIfNecessary
-
requestRejoin
-
requestRejoin
Request to rejoin the group.- Parameters:
shortReason- This is the reason passed up to the group coordinator. It must be reasonably small.fullReason- This is the reason logged locally.
-
close
public final void close()Close the coordinator, waiting if needed to send LeaveGroup.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable
-
close
protected void close(org.apache.kafka.common.utils.Timer timer) - Throws:
org.apache.kafka.common.KafkaException- if the rebalance callback throws exception
-
maybeLeaveGroup
Sends LeaveGroupRequest and logs theleaveReason, unless this member is using static membership or is already not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown).- Parameters:
leaveReason- the reason to leave the group for logging- Throws:
org.apache.kafka.common.KafkaException- if the rebalance callback throws exception
-
isDynamicMember
protected boolean isDynamicMember() -
createMeter
-