Testing with a Docker Kafka cluster
The Testcontainers project contains a nice API to start and stop Apache Kafka in Docker containers. This becomes very relevant when your application code uses a Scala version which Apache Kafka doesn’t support so that EmbeddedKafka can’t be used. Testcontainers also allow you to create a complete Kafka cluster (using Docker containers) to simulate production use cases.
Settings
You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker, ZooKeeper and Schema Registry configuration, by updating KafkaTestkitTestcontainersSettings
KafkaTestkitTestcontainersSettings
in code or configuration. The KafkaTestkitTestcontainersSettings
KafkaTestkitTestcontainersSettings
type can be used to perform actions such as:
- Set the docker image and tag of Kafka, ZooKeeper, and Schema Registry version to use (a recent Confluent Platform version is used by default)
- Define number of Kafka brokers
- Overriding container settings and environment variables (i.e. to change default Broker config)
- Apply custom docker configuration to the Kafka and ZooKeeper containers used to create a cluster
To change defaults for all settings update the appropriate configuration in akka.kafka.testkit.testcontainers
.
sourceakka.kafka.testkit.testcontainers {
# define these settings to select a different Kafka/ZooKeeper docker image
# we recommend using Confluent Platform docker images and using the same version across all images
# Confluent publishes images on DockerHub: https://hub.docker.com/r/confluentinc/cp-kafka/tags
# Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html
zookeeper-image = "confluentinc/cp-zookeeper"
zookeeper-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
kafka-image = "confluentinc/cp-kafka"
kafka-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
schema-registry-image = "confluentinc/cp-schema-registry"
schema-registry-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
# See https://docs.confluent.io/platform/current/installation/versions-interoperability.html
confluent-platform-version = "7.0.0"
# the number of Kafka brokers to include in a test cluster
num-brokers = 1
# set this to use a replication factor for internal Kafka topics such as Consumer Offsets and Transaction log.
# this replication factor must be less than or equal to `num-brokers`
internal-topics-replication-factor = 1
# set this to true to launch a testcontainer for Confluent Schema Registry
use-schema-registry = false
# set this to true to stream the STDOUT and STDERR of containers to SLF4J loggers
# this requires the SLF4J dependency to be on the classpath and the loggers enabled in your logging configuration
container-logging = false
# set this to the total length of time to wait for a Kafka container cluster to start. this includes all brokers
# zookeeper, and schema registry nodes. note that this can take a considerable time in limited resource environments.
cluster-start-timeout = 360 s
# set this to the total length of time to wait for a Kafka container readiness check to complete. note that this can
# take a considerable time in limited resource environments.
readiness-check-timeout = 360 s
}
You can override all these defaults in code and per test class. By applying settings in code you can also configure the Kafka and ZooKeeper containers themselves.
For example, the following demonstrates creating a 3 Broker Kafka cluster and disables the automatic topic creation broker configuration using environment variables.
- Scala
-
source
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.matchers.should.Matchers abstract class SpecBase(kafkaPort: Int) extends ScalatestKafkaSpec(kafkaPort) with AnyWordSpecLike with Matchers with ScalaFutures with Eventually { protected def this() = this(kafkaPort = -1) } import akka.kafka.testkit.KafkaTestkitTestcontainersSettings import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike class TestcontainersNewSettingsSampleSpec extends SpecBase with TestcontainersKafkaPerClassLike { override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system) .withNumBrokers(3) .withInternalTopicsReplicationFactor(2) .withConfigureKafka { brokerContainers => brokerContainers.foreach(_.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")) } // ... }
- Java
-
source
import akka.actor.ActorSystem; import akka.kafka.testkit.KafkaTestkitTestcontainersSettings; import akka.kafka.testkit.javadsl.TestcontainersKafkaTest; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.TestInstance; @TestInstance(TestInstance.Lifecycle.PER_CLASS) class TestkitTestcontainersTest extends TestcontainersKafkaTest { private static final ActorSystem system = ActorSystem.create("TestkitTestcontainersTest"); private static KafkaTestkitTestcontainersSettings testcontainersSettings = KafkaTestkitTestcontainersSettings.create(system) .withNumBrokers(3) .withInternalTopicsReplicationFactor(2) .withConfigureKafkaConsumer( brokerContainers -> brokerContainers.forEach( b -> b.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"))); TestkitTestcontainersTest() { // this will only start a new cluster if it has not already been started. // // you must stop the cluster in the afterClass implementation if you want to create a cluster // per test class // using (TestInstance.Lifecycle.PER_CLASS) super(system, testcontainersSettings); } // ... // omit this implementation if you want the cluster to stay up for all your tests @AfterAll void afterClass() { TestcontainersKafkaTest.stopKafka(); } }
To see what options are available for configuring testcontainers using configureKafka
and configureZooKeeper
in KafkaTestkitTestcontainersSettings
KafkaTestkitTestcontainersSettings
see the API docs for AlpakkaKafkaContainer
AlpakkaKafkaContainer
and GenericContainer
.
Testing with Schema Registry
If you use Confluent’s Schema Registry in your client code you may optionally run a Schema Registry container as well. The following snippet demonstrates overriding a test class to use the container, or you can enable it for all tests in your application.conf
.
- Scala
-
source
class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKafkaPerClassLike { override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system).withSchemaRegistry(true) // tests.. }
- Java
-
source
public class SchemaRegistrySerializationTest extends TestcontainersKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("SchemaRegistrySerializationTest"); private static final Executor ec = Executors.newSingleThreadExecutor(); public SchemaRegistrySerializationTest() { super( sys, KafkaTestkitTestcontainersSettings.create(sys) .withInternalTopicsReplicationFactor(1) .withSchemaRegistry(true)); } }
You can retrieve the Schema Registry URL in your test configuration by calling getSchemaRegistryUrl()
or schemaRegistryUrl
.
Testing with a Docker Kafka cluster from Java code
The Alpakka Kafka testkit contains helper classes to start Kafka via Testcontainers. Alternatively, you may use just Testcontainers, as it is designed to be used with JUnit and you can follow their documentation to start and stop Kafka. To start a single instance for many tests see Singleton containers.
The Testcontainers dependency must be added to your project explicitly.
- Maven
<dependencies> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.20.1</version> <scope>test</scope> </dependency> </dependencies>
- sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.20.1" % Test
- Gradle
dependencies { testImplementation "org.testcontainers:kafka:1.20.1" }
The example below shows skeleton test classes for JUnit 4 and JUnit 5. The Kafka broker will start before the first test and be stopped after all test classes are finished.
- Java JUnit 4
-
source
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.Test; public class AssignmentTest extends TestcontainersKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("AssignmentTest"); public AssignmentTest() { super(sys); } @AfterClass public static void afterClass() { TestKit.shutdownActorSystem(sys); } }
- Java JUnit 5
-
source
import akka.kafka.testkit.javadsl.TestcontainersKafkaTest; import akka.testkit.javadsl.TestKit; import org.junit.jupiter.api.*; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.extension.ExtendWith; @TestInstance(Lifecycle.PER_CLASS) class ProducerTest extends TestcontainersKafkaTest { private static final ActorSystem system = ActorSystem.create(); ProducerTest() { super(system); } @AfterAll void shutdownActorSystem() { TestKit.shutdownActorSystem(system); } }
Testing with a Docker Kafka cluster from Scala code
The Testcontainers dependency must be added to your project explicitly.
- Maven
<dependencies> <dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.20.1</version> <scope>test</scope> </dependency> </dependencies>
- sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.20.1" % Test
- Gradle
dependencies { testImplementation "org.testcontainers:kafka:1.20.1" }
To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped
assertAllStagesStopped
. This may interfere with the stop-timeout
which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf
for tests.
One cluster for all tests
By mixing in TestcontainersKafkaLike
the Kafka Docker cluster will be started before the first test and shut down after all tests are finished.
- Scala
-
source
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.wordspec.AnyWordSpecLike import org.scalatest.matchers.should.Matchers abstract class SpecBase(kafkaPort: Int) extends ScalatestKafkaSpec(kafkaPort) with AnyWordSpecLike with Matchers with ScalaFutures with Eventually { protected def this() = this(kafkaPort = -1) } import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike class TestcontainersSampleSpec extends SpecBase with TestcontainersKafkaLike { // ... }
With this TestcontainersSampleSpec
class test classes can extend it to automatically start and stop a Kafka broker to test with.
One cluster per test class
By mixing in TestcontainersKafkaPerClassLike
a specific Kafka Docker cluster will be started for that test class and stopped after its run finished.