Class MembershipManagerImpl

java.lang.Object
org.apache.kafka.clients.consumer.internals.MembershipManagerImpl
All Implemented Interfaces:
MembershipManager, org.apache.kafka.common.ClusterResourceListener

public class MembershipManagerImpl extends Object implements MembershipManager, org.apache.kafka.common.ClusterResourceListener
Group manager for a single consumer that has a group id defined in the config ConsumerConfig.GROUP_ID_CONFIG, to use the Kafka-based offset management capability, and the consumer group protocol to get automatically assigned partitions when calling the subscribe API.

While the subscribe API hasn't been called (or if the consumer called unsubscribe), this manager will only be responsible for keeping the member in the MemberState.UNSUBSCRIBED state, where it can commit offsets to the group identified by the groupId(), without joining the group.

If the consumer subscribe API is called, this manager will use the groupId() to join the consumer group, and based on the consumer group protocol heartbeats, will handle the full lifecycle of the member as it joins the group, reconciles assignments, handles fencing and fatal errors, and leaves the group.

Reconciliation process:

The member accepts all assignments received from the broker, resolves topic names from metadata, reconciles the resolved assignments, and keeps the unresolved to be reconciled when discovered with a metadata update. Reconciliations of resolved assignments are executed sequentially and acknowledged to the server as they complete. The reconciliation process involves multiple async operations, so the member will continue to heartbeat while these operations complete, to make sure that the member stays in the group while reconciling.

Reconciliation steps:

  1. Resolve topic names for all topic IDs received in the target assignment. Topic names found in metadata are then ready to be reconciled. Topic IDs not found are kept as unresolved, and the member request metadata updates until it resolves them (or the broker removes it from the target assignment.
  2. Commit offsets if auto-commit is enabled.
  3. Invoke the user-defined onPartitionsRevoked listener.
  4. Invoke the user-defined onPartitionsAssigned listener.
  5. When the above steps complete, the member acknowledges the reconciled assignment, which is the subset of the target that was resolved from metadata and actually reconciled. The ack is performed by sending a heartbeat request back to the broker, including the reconciled assignment.
Note that user-defined callbacks are triggered from this manager that runs in the BackgroundThread, but executed in the Application Thread, where a failure will be returned to the user if the callbacks fail. This manager is only concerned about the callbacks completion to know that it can proceed with the reconciliation.
  • Constructor Details

  • Method Details

    • groupId

      public String groupId()
      Specified by:
      groupId in interface MembershipManager
      Returns:
      Group ID of the consumer group the member is part of (or wants to be part of).
    • groupInstanceId

      public Optional<String> groupInstanceId()
      Specified by:
      groupInstanceId in interface MembershipManager
      Returns:
      Instance ID used by the member when joining the group. If non-empty, it will indicate that this is a static member.
    • memberId

      public String memberId()
      Specified by:
      memberId in interface MembershipManager
      Returns:
      Member ID assigned by the server to this member when it joins the consumer group.
    • memberEpoch

      public int memberEpoch()
      Specified by:
      memberEpoch in interface MembershipManager
      Returns:
      Current epoch of the member, maintained by the server.
    • isStaled

      public boolean isStaled()
      Specified by:
      isStaled in interface MembershipManager
      Returns:
      True if the member is staled due to expired poll timer.
    • onHeartbeatResponseReceived

      public void onHeartbeatResponseReceived(org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData response)
      Update member info and transition member state based on a successful heartbeat response.
      Specified by:
      onHeartbeatResponseReceived in interface MembershipManager
      Parameters:
      response - Heartbeat response to extract member info and errors from.
    • transitionToFenced

      public void transitionToFenced()
      Transition the member to the FENCED state, where the member will release the assignment by calling the onPartitionsLost callback, and when the callback completes, it will transition to MemberState.JOINING to rejoin the group. This is expected to be invoked when the heartbeat returns a FENCED_MEMBER_EPOCH or UNKNOWN_MEMBER_ID error.
      Specified by:
      transitionToFenced in interface MembershipManager
    • transitionToFatal

      public void transitionToFatal()
      Transition the member to the FAILED state and update the member info as required. This is invoked when un-recoverable errors occur (ex. when the heartbeat returns a non-retriable error)
      Specified by:
      transitionToFatal in interface MembershipManager
    • onSubscriptionUpdated

      public void onSubscriptionUpdated()
      Join the group with the updated subscription, if the member is not part of it yet. If the member is already part of the group, this will only ensure that the updated subscription is included in the next heartbeat request.

      Note that list of topics of the subscription is taken from the shared subscription state.

      Specified by:
      onSubscriptionUpdated in interface MembershipManager
    • transitionToJoining

      public void transitionToJoining()
      Transition to the MemberState.JOINING state, indicating that the member will try to join the group on the next heartbeat request. This is expected to be invoked when the user calls the subscribe API, or when the member wants to rejoin after getting fenced. Visible for testing.
      Specified by:
      transitionToJoining in interface MembershipManager
    • leaveGroup

      public CompletableFuture<Void> leaveGroup()
      Release assignment and transition to MemberState.PREPARE_LEAVING so that a heartbeat request is sent indicating the broker that the member wants to leave the group. This is expected to be invoked when the user calls the unsubscribe API.
      Specified by:
      leaveGroup in interface MembershipManager
      Returns:
      Future that will complete when the callback execution completes and the heartbeat to leave the group has been sent out.
    • shouldHeartbeatNow

      public boolean shouldHeartbeatNow()
      Specified by:
      shouldHeartbeatNow in interface MembershipManager
      Returns:
      True if the member should send heartbeat to the coordinator without waiting for the interval.
    • onHeartbeatRequestSent

      public void onHeartbeatRequestSent()
      Update state when a heartbeat is sent out. This will transition out of the states that end when a heartbeat request is sent, without waiting for a response (ex. MemberState.ACKNOWLEDGING and MemberState.LEAVING).
      Specified by:
      onHeartbeatRequestSent in interface MembershipManager
    • onHeartbeatRequestSkipped

      public void onHeartbeatRequestSkipped()
      Transition out of the MemberState.LEAVING state even if the heartbeat was not sent . This will ensure that the member is not blocked on MemberState.LEAVING (best effort to send the request, without any response handling or retry logic)
      Specified by:
      onHeartbeatRequestSkipped in interface MembershipManager
    • shouldSkipHeartbeat

      public boolean shouldSkipHeartbeat()
      Specified by:
      shouldSkipHeartbeat in interface MembershipManager
      Returns:
      True if the member should skip sending the heartbeat to the coordinator. This could be the case then the member is not in a group, or when it failed with a fatal error.
    • transitionToStale

      public void transitionToStale()
      Sets the epoch to the leave group epoch and clears the assignments. The member will rejoin with the existing subscriptions on the next time user polls.
      Specified by:
      transitionToStale in interface MembershipManager
    • consumerRebalanceListenerCallbackCompleted

      public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent event)
      Description copied from interface: MembershipManager
      Signals that a ConsumerRebalanceListener callback has completed. This is invoked when the application thread has completed the callback and has submitted a ConsumerRebalanceListenerCallbackCompletedEvent to the network I/O thread. At this point, we notify the state machine that it's complete so that it can move to the next appropriate step of the rebalance process.
      Specified by:
      consumerRebalanceListenerCallbackCompleted in interface MembershipManager
      Parameters:
      event - Event with details about the callback that was executed
    • state

      public MemberState state()
      Specified by:
      state in interface MembershipManager
      Returns:
      Current state of this member in relationship to a consumer group, as defined in MemberState.
    • serverAssignor

      public Optional<String> serverAssignor()
      Specified by:
      serverAssignor in interface MembershipManager
      Returns:
      Server-side assignor implementation configured for the member, that will be sent out to the server to be used. If empty, then the server will select the assignor.
    • currentAssignment

      public Map<org.apache.kafka.common.Uuid,SortedSet<Integer>> currentAssignment()
      Specified by:
      currentAssignment in interface MembershipManager
      Returns:
      Current assignment for the member as received from the broker (topic IDs and partitions). This is the last assignment that the member has successfully reconciled.
    • onUpdate

      public void onUpdate(org.apache.kafka.common.ClusterResource clusterResource)
      When cluster metadata is updated, try to resolve topic names for topic IDs received in assignment that hasn't been resolved yet.
      • Try to find topic names for all unresolved assignments
      • Add discovered topic names to the local topic names cache
      • If any topics are resolved, trigger a reconciliation process
      • If some topics still remain unresolved, request another metadata update
      Specified by:
      onUpdate in interface org.apache.kafka.common.ClusterResourceListener
    • registerStateListener

      public void registerStateListener(MemberStateListener listener)
      Register a new listener that will be invoked whenever the member state changes, or a new member ID or epoch is received.
      Specified by:
      registerStateListener in interface MembershipManager
      Parameters:
      listener - Listener to invoke.