Service discovery

By using Akka Discovery Alpakka Kafka may read the Kafka bootstrap server addresses from any Akka Discovery-compatible service discovery mechanism.

Akka Discovery supports Configuration (HOCON), DNS (SRV records), and aggregation of multiple discovery methods out-of-the-box. Kubernetes API, AWS API: EC2 Tag-Based Discovery, AWS API: ECS Discovery and Consul implementations for Akka Discovery are available in Akka Management.

Dependency

The Akka Discovery version must match the Akka version used in your build. To use the implementations provided by Akka Management, you need to add the desired dependency.

Maven
<properties>
  <akka.version>2.5.23</akka.version>
</properties>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-discovery_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
sbt
val AkkaVersion = "2.5.23"
libraryDependencies += "com.typesafe.akka" %% "akka-discovery" % AkkaVersion
Gradle
versions += [
  AkkaVersion: "2.5.23"
]
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-discovery_2.12', version: versions.AkkaVersion
}

Configure consumer settings

To use Akka Discovery with Alpakka Kafka consumers, configure a section for your consumer settings which inherits the default settings (by using ${akka.kafka.consumer}) and add a service name and a timeout for the service lookup. Setting the service-name in the akka.kafka.consumer config will work, if all your consumers connect to the same Kafka broker.

The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout depending on the discovery technology used, and your environment.

Note that consumers and producers may share a service (as shown in the examples on this page).

application.conf
discovery-consumer: ${akka.kafka.consumer} {
  service-name = "kafkaService1"
}

Mount the DiscoverySupportDiscoverySupport in your consumer settings:

Scala
import akka.kafka.scaladsl.DiscoverySupport

val consumerConfig = config.getConfig("discovery-consumer")
val settings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
  .withEnrichAsync(DiscoverySupport.consumerBootstrapServers(consumerConfig))
Java
import akka.kafka.javadsl.DiscoverySupport;

Config consumerConfig = system.settings().config().getConfig("discovery-consumer");
ConsumerSettings<String, String> settings =
    ConsumerSettings.create(consumerConfig, new StringDeserializer(), new StringDeserializer())
        .withEnrichCompletionStage(
            DiscoverySupport.consumerBootstrapServers(consumerConfig, system));

Configure producer settings

To use Akka Discovery with Alpakka Kafka producers, configure a section for your producer settings which inherits the default settings (by using ${akka.kafka.producer}) and add a service name and a timeout for the service lookup. Setting the service-name in the akka.kafka.producer config will work, if all your producers connect to the same Kafka broker.

The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout depending on the discovery technology used, and your environment.

Note that consumers and producers may share a service (as shown in the examples on this page).

application.conf
discovery-producer: ${akka.kafka.producer} {
  service-name = "kafkaService1"
}

Mount the DiscoverySupportDiscoverySupport in your producer settings:

Scala
import akka.kafka.scaladsl.DiscoverySupport

val producerConfig = config.getConfig("discovery-producer")
val settings = ProducerSettings(producerConfig, new StringSerializer, new StringSerializer)
  .withEnrichAsync(DiscoverySupport.producerBootstrapServers(producerConfig))
Java
import akka.kafka.javadsl.DiscoverySupport;

Config producerConfig = system.settings().config().getConfig("discovery-producer");
ProducerSettings<String, String> settings =
    ProducerSettings.create(producerConfig, new StringSerializer(), new StringSerializer())
        .withEnrichCompletionStage(
            DiscoverySupport.producerBootstrapServers(producerConfig, system));

Provide a service name via environment variables

To set the service name for lookup of the Kafka brokers bootstrap addresses via environment variables, use the built-in s support in Typesafe Config as below. This example will use the value from the environment variable KAFKA_SERVICE_NAME and in case that is not defined default to kafkaServiceDefault.

application.conf
akka.kafka.producer {
  service-name = "kafkaServiceDefault"
  service-name = ${?KAFKA_SERVICE_NAME}
}

akka.kafka.consumer {
  service-name = "kafkaServiceDefault"
  service-name = ${?KAFKA_SERVICE_NAME}
}

Specify a different service discovery mechanism

The Actor System-wide service discovery is used by default, to choose a different Akka Discovery implementation, set the discovery-method setting in the producer and consumer configurations accordingly.

application.conf
discovery-producer: ${akka.kafka.producer} {
  discovery-method = "kubernetes-api"
  service-name = "kafkaService1"
  resolve-timeout = 3 seconds
}

Use Config (HOCON) to describe the bootstrap servers

The setup below uses the built-in Akka Discovery implementation reading from Config (HOCON) files. That might be a good choice for development and testing. You may use the Aggregate implementation to first use another discovery technology, before falling back to the config file.

application.conf
akka.discovery.method = config
akka.discovery.config.services = {
  kafkaService1 = {
    endpoints = [
      { host = "cat", port = 1233 }
      { host = "dog", port = 1234 }
    ]
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.