Service discovery
By using Akka Discovery Alpakka Kafka may read the Kafka bootstrap server addresses from any Akka Discovery-compatible service discovery mechanism.
Akka Discovery supports Configuration (HOCON), DNS (SRV records), and aggregation of multiple discovery methods out-of-the-box. Kubernetes API, AWS API: EC2 Tag-Based Discovery, AWS API: ECS Discovery and Consul implementations for Akka Discovery are available in Akka Management.
Dependency
The Akka Discovery version must match the Akka version used in your build. To use the implementations provided by Akka Management, you need to add the desired dependency.
- Maven
<properties> <akka.version>2.10.0</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-discovery_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- sbt
val AkkaVersion = "2.10.0" libraryDependencies += "com.typesafe.akka" %% "akka-discovery" % AkkaVersion
- Gradle
def versions = [ AkkaVersion: "2.10.0", ScalaBinary: "2.13" ] dependencies { implementation "com.typesafe.akka:akka-discovery_${versions.ScalaBinary}:${versions.AkkaVersion}" }
Configure consumer settings
To use Akka Discovery with Alpakka Kafka consumers, configure a section for your consumer settings which inherits the default settings (by using ${akka.kafka.consumer}
) and add a service name and a timeout for the service lookup. Setting the service-name
in the akka.kafka.consumer
config will work, if all your consumers connect to the same Kafka broker.
The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout
depending on the discovery technology used, and your environment.
Note that consumers and producers may share a service (as shown in the examples on this page).
- application.conf
-
discovery-consumer: ${akka.kafka.consumer} { service-name = "kafkaService1" }
Mount the DiscoverySupport
DiscoverySupport
in your consumer settings:
- Scala
-
source
import akka.kafka.scaladsl.DiscoverySupport val consumerConfig = config.getConfig("discovery-consumer") val settings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer) .withEnrichAsync(DiscoverySupport.consumerBootstrapServers(consumerConfig))
- Java
-
source
import akka.kafka.javadsl.DiscoverySupport; Config consumerConfig = system.settings().config().getConfig("discovery-consumer"); ConsumerSettings<String, String> settings = ConsumerSettings.create(consumerConfig, new StringDeserializer(), new StringDeserializer()) .withEnrichCompletionStage( DiscoverySupport.consumerBootstrapServers(consumerConfig, system));
Configure producer settings
To use Akka Discovery with Alpakka Kafka producers, configure a section for your producer settings which inherits the default settings (by using ${akka.kafka.producer}
) and add a service name and a timeout for the service lookup. Setting the service-name
in the akka.kafka.producer
config will work, if all your producers connect to the same Kafka broker.
The service name must match the one configured with the discovery technology you use. Overwrite the resolve-timeout
depending on the discovery technology used, and your environment.
Note that consumers and producers may share a service (as shown in the examples on this page).
- application.conf
-
discovery-producer: ${akka.kafka.producer} { service-name = "kafkaService1" }
Mount the DiscoverySupport
DiscoverySupport
in your producer settings:
- Scala
-
source
import akka.kafka.scaladsl.DiscoverySupport val producerConfig = config.getConfig("discovery-producer") val settings = ProducerSettings(producerConfig, new StringSerializer, new StringSerializer) .withEnrichAsync(DiscoverySupport.producerBootstrapServers(producerConfig))
- Java
-
source
import akka.kafka.javadsl.DiscoverySupport; Config producerConfig = system.settings().config().getConfig("discovery-producer"); ProducerSettings<String, String> settings = ProducerSettings.create(producerConfig, new StringSerializer(), new StringSerializer()) .withEnrichCompletionStage( DiscoverySupport.producerBootstrapServers(producerConfig, system));
Provide a service name via environment variables
To set the service name for lookup of the Kafka brokers bootstrap addresses via environment variables, use the built-in s support in Typesafe Config as below. This example will use the value from the environment variable KAFKA_SERVICE_NAME
and in case that is not defined default to kafkaServiceDefault
.
- application.conf
-
akka.kafka.producer { service-name = "kafkaServiceDefault" service-name = ${?KAFKA_SERVICE_NAME} } akka.kafka.consumer { service-name = "kafkaServiceDefault" service-name = ${?KAFKA_SERVICE_NAME} }
Specify a different service discovery mechanism
The Actor System-wide service discovery is used by default, to choose a different Akka Discovery implementation, set the discovery-method
setting in the producer and consumer configurations accordingly.
- application.conf
-
discovery-producer: ${akka.kafka.producer} { discovery-method = "kubernetes-api" service-name = "kafkaService1" resolve-timeout = 3 seconds }
Use Config (HOCON) to describe the bootstrap servers
The setup below uses the built-in Akka Discovery implementation reading from Config (HOCON) files. That might be a good choice for development and testing. You may use the Aggregate implementation to first use another discovery technology, before falling back to the config file.
- application.conf
-
source
akka.discovery.method = config akka.discovery.config.services = { kafkaService1 = { endpoints = [ { host = "cat", port = 1233 } { host = "dog", port = 1234 } ] } }