Class KafkaConnectionImpl
- All Implemented Interfaces:
KafkaConnection
KafkaConnection that loads Kafka properties from a file,
provides factory methods for creating clients, and tracks them for graceful shutdown.- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionCreates a newAdminClientusing the base configuration properties.createAdminClient(@NotNull Properties overrides) Creates a newAdminClientwith the specified property overrides.Creates a newKafkaConsumerwithStringkey and value deserializers.<K,V> @NotNull KafkaConsumer <K, V> createConsumer(@NotNull Properties overrides) Creates a newKafkaConsumerwith the specified property overrides.Creates a newKafkaProducerwithStringkey and value serializers.<K,V> @NotNull KafkaProducer <K, V> createProducer(@NotNull Properties overrides) Creates a newKafkaProducerwith the specified property overrides.Returns a defensive copy of the loaded Kafka properties.voidinit()Initializes the connection by loading and validating the configuration properties.booleanReturns whether this connection is configured to cause server shutdown on initialization failure.voidshutdown()Shuts down all tracked Kafka clients gracefully with the configured timeout.
-
Constructor Details
-
KafkaConnectionImpl
public KafkaConnectionImpl(@NotNull @NotNull Logger logger, @NotNull @NotNull String id, @NotNull @NotNull String configFilePath) Creates a new Kafka connection instance.- Parameters:
logger- the logger for diagnostic messagesid- the connection identifierconfigFilePath- the path to the.propertiesconfiguration file
-
-
Method Details
-
init
Description copied from interface:KafkaConnectionInitializes the connection by loading and validating the configuration properties.- Specified by:
initin interfaceKafkaConnection- Throws:
ConnectionInitException- if the properties file cannot be loaded orbootstrap.serversis missing
-
getProperties
Description copied from interface:KafkaConnectionReturns a defensive copy of the loaded Kafka properties.Modifications to the returned
Propertiesobject do not affect the internal configuration.- Specified by:
getPropertiesin interfaceKafkaConnection- Returns:
- a copy of the Kafka configuration properties
-
createProducer
Description copied from interface:KafkaConnectionCreates a newKafkaProducerwithStringkey and value serializers.The returned producer is tracked and will be closed on
KafkaConnection.shutdown().- Specified by:
createProducerin interfaceKafkaConnection- Returns:
- a new Kafka producer configured with the loaded properties
-
createProducer
@NotNull public <K,V> @NotNull KafkaProducer<K,V> createProducer(@NotNull @NotNull Properties overrides) Description copied from interface:KafkaConnectionCreates a newKafkaProducerwith the specified property overrides.Override properties are merged on top of the base configuration (overrides win). The returned producer is tracked and will be closed on
KafkaConnection.shutdown().- Specified by:
createProducerin interfaceKafkaConnection- Type Parameters:
K- the key typeV- the value type- Parameters:
overrides- additional properties to merge (e.g., custom serializers)- Returns:
- a new Kafka producer
-
createConsumer
Description copied from interface:KafkaConnectionCreates a newKafkaConsumerwithStringkey and value deserializers.The returned consumer is tracked and will be closed on
KafkaConnection.shutdown().- Specified by:
createConsumerin interfaceKafkaConnection- Returns:
- a new Kafka consumer configured with the loaded properties
-
createConsumer
@NotNull public <K,V> @NotNull KafkaConsumer<K,V> createConsumer(@NotNull @NotNull Properties overrides) Description copied from interface:KafkaConnectionCreates a newKafkaConsumerwith the specified property overrides.Override properties are merged on top of the base configuration (overrides win). The returned consumer is tracked and will be closed on
KafkaConnection.shutdown().- Specified by:
createConsumerin interfaceKafkaConnection- Type Parameters:
K- the key typeV- the value type- Parameters:
overrides- additional properties to merge (e.g.,group.id, custom deserializers)- Returns:
- a new Kafka consumer
-
createAdminClient
Description copied from interface:KafkaConnectionCreates a newAdminClientusing the base configuration properties.The returned client is tracked and will be closed on
KafkaConnection.shutdown().- Specified by:
createAdminClientin interfaceKafkaConnection- Returns:
- a new Kafka admin client
-
createAdminClient
Description copied from interface:KafkaConnectionCreates a newAdminClientwith the specified property overrides.The returned client is tracked and will be closed on
KafkaConnection.shutdown().- Specified by:
createAdminClientin interfaceKafkaConnection- Parameters:
overrides- additional properties to merge- Returns:
- a new Kafka admin client
-
shutdown
public void shutdown()Description copied from interface:KafkaConnectionShuts down all tracked Kafka clients gracefully with the configured timeout.The timeout is controlled by the
shutdownTimeoutMsproperty (default:30000milliseconds).- Specified by:
shutdownin interfaceKafkaConnection
-
isExitOnFailure
public boolean isExitOnFailure()Description copied from interface:KafkaConnectionReturns whether this connection is configured to cause server shutdown on initialization failure.- Specified by:
isExitOnFailurein interfaceKafkaConnection- Returns:
trueif the server should exit when this connection fails to initialize
-