Interface KafkaConnection

All Known Implementing Classes:
KafkaConnectionImpl

public interface KafkaConnection
Represents a managed Kafka connection that exposes the loaded configuration properties and provides factory methods for creating Kafka clients.

Unlike other connector types, Kafka connections do not maintain a persistent connection. Instead, they load and validate a .properties file and provide factory methods for creating KafkaProducer, KafkaConsumer, and AdminClient instances.

All clients created through factory methods are tracked and will be closed gracefully when shutdown() is called.

Configuration is loaded from a .properties file referenced by the CONNECTOR_KAFKA_<ID>_CONFIG environment variable. The bootstrap.servers property is required and validated at initialization. An optional shutdownTimeoutMs property controls the graceful shutdown timeout (default: 30000).

Usage example:


 KafkaConnection conn = service.getConnection("MAIN").orElseThrow();

 // No-arg: defaults to String serialization
 KafkaProducer<String, String> producer = conn.createProducer();

 // With overrides for custom serializers or consumer group
 Properties overrides = new Properties();
 overrides.put("group.id", "my-group");
 KafkaConsumer<String, String> consumer = conn.createConsumer(overrides);
 
See Also:
  • Method Details

    • getProperties

      @NotNull @NotNull Properties getProperties()
      Returns a defensive copy of the loaded Kafka properties.

      Modifications to the returned Properties object do not affect the internal configuration.

      Returns:
      a copy of the Kafka configuration properties
    • createProducer

      Creates a new KafkaProducer with String key and value serializers.

      The returned producer is tracked and will be closed on shutdown().

      Returns:
      a new Kafka producer configured with the loaded properties
    • createProducer

      @NotNull <K, V> @NotNull KafkaProducer<K,V> createProducer(@NotNull @NotNull Properties overrides)
      Creates a new KafkaProducer with the specified property overrides.

      Override properties are merged on top of the base configuration (overrides win). The returned producer is tracked and will be closed on shutdown().

      Type Parameters:
      K - the key type
      V - the value type
      Parameters:
      overrides - additional properties to merge (e.g., custom serializers)
      Returns:
      a new Kafka producer
    • createConsumer

      Creates a new KafkaConsumer with String key and value deserializers.

      The returned consumer is tracked and will be closed on shutdown().

      Returns:
      a new Kafka consumer configured with the loaded properties
    • createConsumer

      @NotNull <K, V> @NotNull KafkaConsumer<K,V> createConsumer(@NotNull @NotNull Properties overrides)
      Creates a new KafkaConsumer with the specified property overrides.

      Override properties are merged on top of the base configuration (overrides win). The returned consumer is tracked and will be closed on shutdown().

      Type Parameters:
      K - the key type
      V - the value type
      Parameters:
      overrides - additional properties to merge (e.g., group.id, custom deserializers)
      Returns:
      a new Kafka consumer
    • createAdminClient

      @NotNull @NotNull AdminClient createAdminClient()
      Creates a new AdminClient using the base configuration properties.

      The returned client is tracked and will be closed on shutdown().

      Returns:
      a new Kafka admin client
    • createAdminClient

      @NotNull @NotNull AdminClient createAdminClient(@NotNull @NotNull Properties overrides)
      Creates a new AdminClient with the specified property overrides.

      The returned client is tracked and will be closed on shutdown().

      Parameters:
      overrides - additional properties to merge
      Returns:
      a new Kafka admin client
    • init

      void init() throws ConnectionInitException
      Initializes the connection by loading and validating the configuration properties.
      Throws:
      ConnectionInitException - if the properties file cannot be loaded or bootstrap.servers is missing
    • shutdown

      void shutdown()
      Shuts down all tracked Kafka clients gracefully with the configured timeout.

      The timeout is controlled by the shutdownTimeoutMs property (default: 30000 milliseconds).

    • isExitOnFailure

      boolean isExitOnFailure()
      Returns whether this connection is configured to cause server shutdown on initialization failure.
      Returns:
      true if the server should exit when this connection fails to initialize