Interface KafkaConnection
- All Known Implementing Classes:
KafkaConnectionImpl
Unlike other connector types, Kafka connections do not maintain a persistent connection.
Instead, they load and validate a .properties file and provide factory methods
for creating KafkaProducer, KafkaConsumer, and AdminClient instances.
All clients created through factory methods are tracked and will be closed gracefully
when shutdown() is called.
Configuration is loaded from a .properties file referenced by the
CONNECTOR_KAFKA_<ID>_CONFIG environment variable. The bootstrap.servers
property is required and validated at initialization. An optional shutdownTimeoutMs
property controls the graceful shutdown timeout (default: 30000).
Usage example:
KafkaConnection conn = service.getConnection("MAIN").orElseThrow();
// No-arg: defaults to String serialization
KafkaProducer<String, String> producer = conn.createProducer();
// With overrides for custom serializers or consumer group
Properties overrides = new Properties();
overrides.put("group.id", "my-group");
KafkaConsumer<String, String> consumer = conn.createConsumer(overrides);
- See Also:
-
Method Summary
Modifier and TypeMethodDescriptionCreates a newAdminClientusing the base configuration properties.createAdminClient(@NotNull Properties overrides) Creates a newAdminClientwith the specified property overrides.Creates a newKafkaConsumerwithStringkey and value deserializers.<K,V> @NotNull KafkaConsumer <K, V> createConsumer(@NotNull Properties overrides) Creates a newKafkaConsumerwith the specified property overrides.Creates a newKafkaProducerwithStringkey and value serializers.<K,V> @NotNull KafkaProducer <K, V> createProducer(@NotNull Properties overrides) Creates a newKafkaProducerwith the specified property overrides.Returns a defensive copy of the loaded Kafka properties.voidinit()Initializes the connection by loading and validating the configuration properties.booleanReturns whether this connection is configured to cause server shutdown on initialization failure.voidshutdown()Shuts down all tracked Kafka clients gracefully with the configured timeout.
-
Method Details
-
getProperties
Returns a defensive copy of the loaded Kafka properties.Modifications to the returned
Propertiesobject do not affect the internal configuration.- Returns:
- a copy of the Kafka configuration properties
-
createProducer
Creates a newKafkaProducerwithStringkey and value serializers.The returned producer is tracked and will be closed on
shutdown().- Returns:
- a new Kafka producer configured with the loaded properties
-
createProducer
Creates a newKafkaProducerwith the specified property overrides.Override properties are merged on top of the base configuration (overrides win). The returned producer is tracked and will be closed on
shutdown().- Type Parameters:
K- the key typeV- the value type- Parameters:
overrides- additional properties to merge (e.g., custom serializers)- Returns:
- a new Kafka producer
-
createConsumer
Creates a newKafkaConsumerwithStringkey and value deserializers.The returned consumer is tracked and will be closed on
shutdown().- Returns:
- a new Kafka consumer configured with the loaded properties
-
createConsumer
Creates a newKafkaConsumerwith the specified property overrides.Override properties are merged on top of the base configuration (overrides win). The returned consumer is tracked and will be closed on
shutdown().- Type Parameters:
K- the key typeV- the value type- Parameters:
overrides- additional properties to merge (e.g.,group.id, custom deserializers)- Returns:
- a new Kafka consumer
-
createAdminClient
Creates a newAdminClientusing the base configuration properties.The returned client is tracked and will be closed on
shutdown().- Returns:
- a new Kafka admin client
-
createAdminClient
Creates a newAdminClientwith the specified property overrides.The returned client is tracked and will be closed on
shutdown().- Parameters:
overrides- additional properties to merge- Returns:
- a new Kafka admin client
-
init
Initializes the connection by loading and validating the configuration properties.- Throws:
ConnectionInitException- if the properties file cannot be loaded orbootstrap.serversis missing
-
shutdown
void shutdown()Shuts down all tracked Kafka clients gracefully with the configured timeout.The timeout is controlled by the
shutdownTimeoutMsproperty (default:30000milliseconds). -
isExitOnFailure
boolean isExitOnFailure()Returns whether this connection is configured to cause server shutdown on initialization failure.- Returns:
trueif the server should exit when this connection fails to initialize
-