Class ConsumerCoordinator
java.lang.Object
org.apache.kafka.clients.consumer.internals.AbstractCoordinator
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
- All Implemented Interfaces:
Closeable,AutoCloseable
This class manages the coordination process with the consumer coordinator.
-
Nested Class Summary
Nested classes/interfaces inherited from class org.apache.kafka.clients.consumer.internals.AbstractCoordinator
AbstractCoordinator.CoordinatorResponseHandler<R,T>, AbstractCoordinator.Generation, AbstractCoordinator.MemberState -
Field Summary
Fields inherited from class org.apache.kafka.clients.consumer.internals.AbstractCoordinator
client, HEARTBEAT_THREAD_PREFIX, JOIN_GROUP_TIMEOUT_LAPSE, retryBackoff, state, time -
Constructor Summary
ConstructorsConstructorDescriptionConsumerCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, List<ConsumerPartitionAssignor> assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time, boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean throwOnFetchStableOffsetsUnsupported, String rackId, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter) Initialize the coordination manager. -
Method Summary
Modifier and TypeMethodDescriptionvoidclose(org.apache.kafka.common.utils.Timer timer) commitOffsetsAsync(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) booleancommitOffsetsSync(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets, org.apache.kafka.common.utils.Timer timer) Commit offsets synchronously.Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<org.apache.kafka.common.TopicPartition> partitions, org.apache.kafka.common.utils.Timer timer) Fetch the current committed offsets from the coordinator for a set of partitions.Return the consumer group metadata.booleaninitWithCommittedOffsetsIfNeeded(org.apache.kafka.common.utils.Timer timer) Refresh the committed offsets for partitions that require initialization.voidmaybeAutoCommitOffsetsAsync(long now) protected org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollectionmetadata()Get the current list of protocols and their associated metadata supported by the local member.protected voidonJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) Invoked when a group member has successfully joined a group.protected booleanonJoinPrepare(org.apache.kafka.common.utils.Timer timer, int generation, String memberId) Invoked prior to each group join or rejoin.protected Map<String, ByteBuffer> onLeaderElected(String leaderId, String assignmentStrategy, List<org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions, boolean skipAssignment) Invoked when the leader is elected.voidInvoked prior to each leave group event.booleanpoll(org.apache.kafka.common.utils.Timer timer, boolean waitForJoinGroup) Poll for coordinator events.Unique identifier for the class of supported protocols (e.g.booleanCheck whether the group should be rejoined (e.g.longtimeToNextPoll(long now) Return the time to the next needed invocation ofConsumerNetworkClient.poll(Timer).voidupdatePatternSubscription(org.apache.kafka.common.Cluster cluster) Methods inherited from class org.apache.kafka.clients.consumer.internals.AbstractCoordinator
checkAndGetCoordinator, close, coordinatorUnknown, createMeter, ensureActiveGroup, ensureCoordinatorReady, ensureCoordinatorReadyAsync, generation, generationIfStable, isDynamicMember, lookupCoordinator, markCoordinatorUnknown, markCoordinatorUnknown, markCoordinatorUnknown, maybeLeaveGroup, memberId, pollHeartbeat, rebalanceInProgress, requestRejoin, requestRejoin, requestRejoinIfNecessary, timeToNextHeartbeat
-
Constructor Details
-
ConsumerCoordinator
public ConsumerCoordinator(org.apache.kafka.clients.GroupRebalanceConfig rebalanceConfig, org.apache.kafka.common.utils.LogContext logContext, ConsumerNetworkClient client, List<ConsumerPartitionAssignor> assignors, ConsumerMetadata metadata, SubscriptionState subscriptions, org.apache.kafka.common.metrics.Metrics metrics, String metricGrpPrefix, org.apache.kafka.common.utils.Time time, boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean throwOnFetchStableOffsetsUnsupported, String rackId, Optional<org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter> clientTelemetryReporter) Initialize the coordination manager.
-
-
Method Details
-
protocolType
Description copied from class:AbstractCoordinatorUnique identifier for the class of supported protocols (e.g. "consumer" or "connect").- Specified by:
protocolTypein classAbstractCoordinator- Returns:
- Non-null protocol type name
-
metadata
protected org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata()Description copied from class:AbstractCoordinatorGet the current list of protocols and their associated metadata supported by the local member. The order of the protocols in the list indicates the preference of the protocol (the first entry is the most preferred). The coordinator takes this preference into account when selecting the generation protocol (generally more preferred protocols will be selected as long as all members support them and there is no disagreement on the preference).- Specified by:
metadatain classAbstractCoordinator- Returns:
- Non-empty map of supported protocols and metadata
-
updatePatternSubscription
public void updatePatternSubscription(org.apache.kafka.common.Cluster cluster) -
onJoinComplete
protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) Description copied from class:AbstractCoordinatorInvoked when a group member has successfully joined a group. If this call fails with an exception, then it will be retried using the same assignment state on the next call toAbstractCoordinator.ensureActiveGroup().- Specified by:
onJoinCompletein classAbstractCoordinator- Parameters:
generation- The generation that was joinedmemberId- The identifier for the local member in the groupassignmentStrategy- The protocol selected by the coordinatorassignmentBuffer- The assignment propagated from the group leader
-
poll
public boolean poll(org.apache.kafka.common.utils.Timer timer, boolean waitForJoinGroup) Poll for coordinator events. This ensures that the coordinator is known and that the consumer has joined the group (if it is using group management). This also handles periodic offset commits if they are enabled.Returns early if the timeout expires or if waiting on rejoin is not required
- Parameters:
timer- Timer bounding how long this method can blockwaitForJoinGroup- Boolean flag indicating if we should wait until re-join group completes- Returns:
- true iff the operation succeeded
- Throws:
org.apache.kafka.common.KafkaException- if the rebalance callback throws an exception
-
timeToNextPoll
public long timeToNextPoll(long now) Return the time to the next needed invocation ofConsumerNetworkClient.poll(Timer).- Parameters:
now- current time in milliseconds- Returns:
- the maximum time in milliseconds the caller should wait before the next invocation of poll()
-
onLeaderElected
protected Map<String,ByteBuffer> onLeaderElected(String leaderId, String assignmentStrategy, List<org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember> allSubscriptions, boolean skipAssignment) Description copied from class:AbstractCoordinatorInvoked when the leader is elected. This is used by the leader to perform the assignment if necessary and to push state to all the members of the group (e.g. to push partition assignments in the case of the new consumer)- Specified by:
onLeaderElectedin classAbstractCoordinator- Parameters:
leaderId- The id of the leader (which is this member)assignmentStrategy- The protocol selected by the coordinatorallSubscriptions- Metadata from all members of the groupskipAssignment- True if leader must skip running the assignor- Returns:
- A map from each member to their state assignment
-
onJoinPrepare
protected boolean onJoinPrepare(org.apache.kafka.common.utils.Timer timer, int generation, String memberId) Description copied from class:AbstractCoordinatorInvoked prior to each group join or rejoin. This is typically used to perform any cleanup from the previous generation (such as committing offsets for the consumer)- Specified by:
onJoinPreparein classAbstractCoordinator- Parameters:
timer- Timer bounding how long this method can blockgeneration- The previous generation or -1 if there was nonememberId- The identifier of this member in the previous group or "" if there was none- Returns:
- true If onJoinPrepare async commit succeeded, false otherwise
-
onLeavePrepare
public void onLeavePrepare()Description copied from class:AbstractCoordinatorInvoked prior to each leave group event. This is typically used to cleanup assigned partitions; note it is triggered by the consumer's API caller thread (i.e. background heartbeat thread would not trigger it even if it tries to force leaving group upon heartbeat session expiration)- Overrides:
onLeavePreparein classAbstractCoordinator
-
rejoinNeededOrPending
public boolean rejoinNeededOrPending()Description copied from class:AbstractCoordinatorCheck whether the group should be rejoined (e.g. if metadata changes) or whether a rejoin request is already in flight and needs to be completed.- Overrides:
rejoinNeededOrPendingin classAbstractCoordinator- Returns:
- true if it should, false otherwise
- Throws:
org.apache.kafka.common.KafkaException- if the callback throws exception
-
initWithCommittedOffsetsIfNeeded
public boolean initWithCommittedOffsetsIfNeeded(org.apache.kafka.common.utils.Timer timer) Refresh the committed offsets for partitions that require initialization.- Parameters:
timer- Timer bounding how long this method can block- Returns:
- true iff the operation completed within the timeout
-
fetchCommittedOffsets
public Map<org.apache.kafka.common.TopicPartition,OffsetAndMetadata> fetchCommittedOffsets(Set<org.apache.kafka.common.TopicPartition> partitions, org.apache.kafka.common.utils.Timer timer) Fetch the current committed offsets from the coordinator for a set of partitions.- Parameters:
partitions- The partitions to fetch offsets for- Returns:
- A map from partition to the committed offset or null if the operation timed out
-
groupMetadata
Return the consumer group metadata.- Returns:
- the current consumer group metadata
-
close
public void close(org.apache.kafka.common.utils.Timer timer) - Overrides:
closein classAbstractCoordinator- Throws:
org.apache.kafka.common.KafkaException- if the rebalance callback throws exception
-
commitOffsetsAsync
public RequestFuture<Void> commitOffsetsAsync(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) -
commitOffsetsSync
public boolean commitOffsetsSync(Map<org.apache.kafka.common.TopicPartition, OffsetAndMetadata> offsets, org.apache.kafka.common.utils.Timer timer) Commit offsets synchronously. This method will retry until the commit completes successfully or an unrecoverable error is encountered.- Parameters:
offsets- The offsets to be committed- Returns:
- If the offset commit was successfully sent and a successful response was received from the coordinator
- Throws:
org.apache.kafka.common.errors.AuthorizationException- if the consumer is not authorized to the group or to any of the specified partitions. See the exception for more detailsCommitFailedException- if an unrecoverable error occurs before the commit can be completedorg.apache.kafka.common.errors.FencedInstanceIdException- if a static member gets fenced
-
maybeAutoCommitOffsetsAsync
public void maybeAutoCommitOffsetsAsync(long now)
-