Class MembershipManagerImpl
java.lang.Object
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl
- All Implemented Interfaces:
MembershipManager,org.apache.kafka.common.ClusterResourceListener
public class MembershipManagerImpl
extends Object
implements MembershipManager, org.apache.kafka.common.ClusterResourceListener
Group manager for a single consumer that has a group id defined in the config
ConsumerConfig.GROUP_ID_CONFIG, to use the Kafka-based offset management capability,
and the consumer group protocol to get automatically assigned partitions when calling the
subscribe API.
While the subscribe API hasn't been called (or if the consumer called unsubscribe), this manager
will only be responsible for keeping the member in the MemberState.UNSUBSCRIBED state,
where it can commit offsets to the group identified by the groupId(), without joining
the group.
If the consumer subscribe API is called, this manager will use the groupId() to join the
consumer group, and based on the consumer group protocol heartbeats, will handle the full
lifecycle of the member as it joins the group, reconciles assignments, handles fencing and
fatal errors, and leaves the group.
Reconciliation process:
The member accepts all assignments received from the broker, resolves topic names from
metadata, reconciles the resolved assignments, and keeps the unresolved to be reconciled when
discovered with a metadata update. Reconciliations of resolved assignments are executed
sequentially and acknowledged to the server as they complete. The reconciliation process
involves multiple async operations, so the member will continue to heartbeat while these
operations complete, to make sure that the member stays in the group while reconciling.
Reconciliation steps:
- Resolve topic names for all topic IDs received in the target assignment. Topic names found in metadata are then ready to be reconciled. Topic IDs not found are kept as unresolved, and the member request metadata updates until it resolves them (or the broker removes it from the target assignment.
- Commit offsets if auto-commit is enabled.
- Invoke the user-defined onPartitionsRevoked listener.
- Invoke the user-defined onPartitionsAssigned listener.
- When the above steps complete, the member acknowledges the reconciled assignment, which is the subset of the target that was resolved from metadata and actually reconciled. The ack is performed by sending a heartbeat request back to the broker, including the reconciled assignment.
-
Constructor Summary
ConstructorsConstructorDescriptionMembershipManagerImpl(String groupId, Optional<String> groupInstanceId, int rebalanceTimeoutMs, Optional<String> serverAssignor, SubscriptionState subscriptions, CommitRequestManager commitRequestManager, ConsumerMetadata metadata, org.apache.kafka.common.utils.LogContext logContext, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter, BackgroundEventHandler backgroundEventHandler, org.apache.kafka.common.utils.Time time) -
Method Summary
Modifier and TypeMethodDescriptionvoidSignals that aConsumerRebalanceListenercallback has completed.groupId()booleanisStaled()Release assignment and transition toMemberState.PREPARE_LEAVINGso that a heartbeat request is sent indicating the broker that the member wants to leave the group.intmemberId()voidUpdate state when a heartbeat is sent out.voidTransition out of theMemberState.LEAVINGstate even if the heartbeat was not sent .voidonHeartbeatResponseReceived(org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData response) Update member info and transition member state based on a successful heartbeat response.voidJoin the group with the updated subscription, if the member is not part of it yet.voidonUpdate(org.apache.kafka.common.ClusterResource clusterResource) When cluster metadata is updated, try to resolve topic names for topic IDs received in assignment that hasn't been resolved yet.voidregisterStateListener(MemberStateListener listener) Register a new listener that will be invoked whenever the member state changes, or a new member ID or epoch is received.booleanbooleanstate()voidTransition the member to the FAILED state and update the member info as required.voidTransition the member to the FENCED state, where the member will release the assignment by calling the onPartitionsLost callback, and when the callback completes, it will transition toMemberState.JOININGto rejoin the group.voidTransition to theMemberState.JOININGstate, indicating that the member will try to join the group on the next heartbeat request.voidSets the epoch to the leave group epoch and clears the assignments.
-
Constructor Details
-
MembershipManagerImpl
public MembershipManagerImpl(String groupId, Optional<String> groupInstanceId, int rebalanceTimeoutMs, Optional<String> serverAssignor, SubscriptionState subscriptions, CommitRequestManager commitRequestManager, ConsumerMetadata metadata, org.apache.kafka.common.utils.LogContext logContext, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter, BackgroundEventHandler backgroundEventHandler, org.apache.kafka.common.utils.Time time)
-
-
Method Details
-
groupId
- Specified by:
groupIdin interfaceMembershipManager- Returns:
- Group ID of the consumer group the member is part of (or wants to be part of).
-
groupInstanceId
- Specified by:
groupInstanceIdin interfaceMembershipManager- Returns:
- Instance ID used by the member when joining the group. If non-empty, it will indicate that this is a static member.
-
memberId
- Specified by:
memberIdin interfaceMembershipManager- Returns:
- Member ID assigned by the server to this member when it joins the consumer group.
-
memberEpoch
public int memberEpoch()- Specified by:
memberEpochin interfaceMembershipManager- Returns:
- Current epoch of the member, maintained by the server.
-
isStaled
public boolean isStaled()- Specified by:
isStaledin interfaceMembershipManager- Returns:
- True if the member is staled due to expired poll timer.
-
onHeartbeatResponseReceived
public void onHeartbeatResponseReceived(org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData response) Update member info and transition member state based on a successful heartbeat response.- Specified by:
onHeartbeatResponseReceivedin interfaceMembershipManager- Parameters:
response- Heartbeat response to extract member info and errors from.
-
transitionToFenced
public void transitionToFenced()Transition the member to the FENCED state, where the member will release the assignment by calling the onPartitionsLost callback, and when the callback completes, it will transition toMemberState.JOININGto rejoin the group. This is expected to be invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error.- Specified by:
transitionToFencedin interfaceMembershipManager
-
transitionToFatal
public void transitionToFatal()Transition the member to the FAILED state and update the member info as required. This is invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable error)- Specified by:
transitionToFatalin interfaceMembershipManager
-
onSubscriptionUpdated
public void onSubscriptionUpdated()Join the group with the updated subscription, if the member is not part of it yet. If the member is already part of the group, this will only ensure that the updated subscription is included in the next heartbeat request. Note that list of topics of the subscription is taken from the shared subscription state.- Specified by:
onSubscriptionUpdatedin interfaceMembershipManager
-
transitionToJoining
public void transitionToJoining()Transition to theMemberState.JOININGstate, indicating that the member will try to join the group on the next heartbeat request. This is expected to be invoked when the user calls the subscribe API, or when the member wants to rejoin after getting fenced. Visible for testing.- Specified by:
transitionToJoiningin interfaceMembershipManager
-
leaveGroup
Release assignment and transition toMemberState.PREPARE_LEAVINGso that a heartbeat request is sent indicating the broker that the member wants to leave the group. This is expected to be invoked when the user calls the unsubscribe API.- Specified by:
leaveGroupin interfaceMembershipManager- Returns:
- Future that will complete when the callback execution completes and the heartbeat to leave the group has been sent out.
-
shouldHeartbeatNow
public boolean shouldHeartbeatNow()- Specified by:
shouldHeartbeatNowin interfaceMembershipManager- Returns:
- True if the member should send heartbeat to the coordinator without waiting for the interval.
-
onHeartbeatRequestSent
public void onHeartbeatRequestSent()Update state when a heartbeat is sent out. This will transition out of the states that end when a heartbeat request is sent, without waiting for a response (ex.MemberState.ACKNOWLEDGINGandMemberState.LEAVING).- Specified by:
onHeartbeatRequestSentin interfaceMembershipManager
-
onHeartbeatRequestSkipped
public void onHeartbeatRequestSkipped()Transition out of theMemberState.LEAVINGstate even if the heartbeat was not sent . This will ensure that the member is not blocked onMemberState.LEAVING(best effort to send the request, without any response handling or retry logic)- Specified by:
onHeartbeatRequestSkippedin interfaceMembershipManager
-
shouldSkipHeartbeat
public boolean shouldSkipHeartbeat()- Specified by:
shouldSkipHeartbeatin interfaceMembershipManager- Returns:
- True if the member should skip sending the heartbeat to the coordinator. This could be the case then the member is not in a group, or when it failed with a fatal error.
-
transitionToStale
public void transitionToStale()Sets the epoch to the leave group epoch and clears the assignments. The member will rejoin with the existing subscriptions on the next time user polls.- Specified by:
transitionToStalein interfaceMembershipManager
-
consumerRebalanceListenerCallbackCompleted
public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event) Description copied from interface:MembershipManagerSignals that aConsumerRebalanceListenercallback has completed. This is invoked when the application thread has completed the callback and has submitted aConsumerRebalanceListenerCallbackCompletedEventto the network I/O thread. At this point, we notify the state machine that it's complete so that it can move to the next appropriate step of the rebalance process.- Specified by:
consumerRebalanceListenerCallbackCompletedin interfaceMembershipManager- Parameters:
event- Event with details about the callback that was executed
-
state
- Specified by:
statein interfaceMembershipManager- Returns:
- Current state of this member in relationship to a consumer group, as defined in
MemberState.
-
serverAssignor
- Specified by:
serverAssignorin interfaceMembershipManager- Returns:
- Server-side assignor implementation configured for the member, that will be sent out to the server to be used. If empty, then the server will select the assignor.
-
currentAssignment
- Specified by:
currentAssignmentin interfaceMembershipManager- Returns:
- Current assignment for the member as received from the broker (topic IDs and partitions). This is the last assignment that the member has successfully reconciled.
-
onUpdate
public void onUpdate(org.apache.kafka.common.ClusterResource clusterResource) When cluster metadata is updated, try to resolve topic names for topic IDs received in assignment that hasn't been resolved yet.- Try to find topic names for all unresolved assignments
- Add discovered topic names to the local topic names cache
- If any topics are resolved, trigger a reconciliation process
- If some topics still remain unresolved, request another metadata update
- Specified by:
onUpdatein interfaceorg.apache.kafka.common.ClusterResourceListener
-
registerStateListener
Register a new listener that will be invoked whenever the member state changes, or a new member ID or epoch is received.- Specified by:
registerStateListenerin interfaceMembershipManager- Parameters:
listener- Listener to invoke.
-