Testing with a Docker Kafka cluster

The Testcontainers project contains a nice API to start and stop Apache Kafka in Docker containers. This becomes very relevant when your application code uses a Scala version which Apache Kafka doesn’t support so that EmbeddedKafka can’t be used. Testcontainers also allow you to create a complete Kafka cluster (using Docker containers) to simulate production use cases.

Settings

You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker, ZooKeeper and Schema Registry configuration, by updating KafkaTestkitTestcontainersSettingsKafkaTestkitTestcontainersSettings in code or configuration. The KafkaTestkitTestcontainersSettingsKafkaTestkitTestcontainersSettings type can be used to perform actions such as:

  • Set the docker image and tag of Kafka, ZooKeeper, and Schema Registry version to use (a recent Confluent Platform version is used by default)
  • Define number of Kafka brokers
  • Overriding container settings and environment variables (i.e. to change default Broker config)
  • Apply custom docker configuration to the Kafka and ZooKeeper containers used to create a cluster

To change defaults for all settings update the appropriate configuration in akka.kafka.testkit.testcontainers.

sourceakka.kafka.testkit.testcontainers {

  # define these settings to select a different Kafka/ZooKeeper docker image
  # we recommend using Confluent Platform docker images and using the same version across all images
  # Confluent publishes images on DockerHub: https://hub.docker.com/r/confluentinc/cp-kafka/tags
  # Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html
  zookeeper-image = "confluentinc/cp-zookeeper"
  zookeeper-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
  kafka-image = "confluentinc/cp-kafka"
  kafka-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
  schema-registry-image = "confluentinc/cp-schema-registry"
  schema-registry-image-tag = ${akka.kafka.testkit.testcontainers.confluent-platform-version}
  # See https://docs.confluent.io/platform/current/installation/versions-interoperability.html
  confluent-platform-version = "7.0.0"

  # the number of Kafka brokers to include in a test cluster
  num-brokers = 1

  # set this to use a replication factor for internal Kafka topics such as Consumer Offsets and Transaction log.
  # this replication factor must be less than or equal to `num-brokers`
  internal-topics-replication-factor = 1

  # set this to true to launch a testcontainer for Confluent Schema Registry
  use-schema-registry = false

  # set this to true to stream the STDOUT and STDERR of containers to SLF4J loggers
  # this requires the SLF4J dependency to be on the classpath and the loggers enabled in your logging configuration
  container-logging = false

  # set this to the total length of time to wait for a Kafka container cluster to start. this includes all brokers
  # zookeeper, and schema registry nodes. note that this can take a considerable time in limited resource environments.
  cluster-start-timeout = 360 s

  # set this to the total length of time to wait for a Kafka container readiness check to complete. note that this can
  # take a considerable time in limited resource environments.
  readiness-check-timeout = 360 s
}

You can override all these defaults in code and per test class. By applying settings in code you can also configure the Kafka and ZooKeeper containers themselves.

For example, the following demonstrates creating a 3 Broker Kafka cluster and disables the automatic topic creation broker configuration using environment variables.

Scala
sourceimport akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.matchers.should.Matchers

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with AnyWordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually {

  protected def this() = this(kafkaPort = -1)
}

import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike

class TestcontainersNewSettingsSampleSpec extends SpecBase with TestcontainersKafkaPerClassLike {

  override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system)
    .withNumBrokers(3)
    .withInternalTopicsReplicationFactor(2)
    .withConfigureKafka { brokerContainers =>
      brokerContainers.foreach(_.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"))
    }

  // ...
}
Java
sourceimport akka.actor.ActorSystem;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.javadsl.TestcontainersKafkaTest;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestkitTestcontainersTest extends TestcontainersKafkaTest {

  private static final ActorSystem system = ActorSystem.create("TestkitTestcontainersTest");

  private static KafkaTestkitTestcontainersSettings testcontainersSettings =
      KafkaTestkitTestcontainersSettings.create(system)
          .withNumBrokers(3)
          .withInternalTopicsReplicationFactor(2)
          .withConfigureKafkaConsumer(
              brokerContainers ->
                  brokerContainers.forEach(
                      b -> b.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")));

  TestkitTestcontainersTest() {
    // this will only start a new cluster if it has not already been started.
    //
    // you must stop the cluster in the afterClass implementation if you want to create a cluster
    // per test class
    // using (TestInstance.Lifecycle.PER_CLASS)
    super(system, testcontainersSettings);
  }

  // ...

  // omit this implementation if you want the cluster to stay up for all your tests
  @AfterAll
  void afterClass() {
    TestcontainersKafkaTest.stopKafka();
  }
}

To see what options are available for configuring testcontainers using configureKafka and configureZooKeeper in KafkaTestkitTestcontainersSettingsKafkaTestkitTestcontainersSettings see the API docs for AlpakkaKafkaContainerAlpakkaKafkaContainer and GenericContainer.

Testing with Schema Registry

If you use Confluent’s Schema Registry in your client code you may optionally run a Schema Registry container as well. The following snippet demonstrates overriding a test class to use the container, or you can enable it for all tests in your application.conf.

Scala
sourceclass SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKafkaPerClassLike {

  override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system).withSchemaRegistry(true)

  // tests..
}
Java
sourcepublic class SchemaRegistrySerializationTest extends TestcontainersKafkaJunit4Test {

  private static final ActorSystem sys = ActorSystem.create("SchemaRegistrySerializationTest");
  private static final Executor ec = Executors.newSingleThreadExecutor();

  public SchemaRegistrySerializationTest() {
    super(
        sys,
        KafkaTestkitTestcontainersSettings.create(sys)
            .withInternalTopicsReplicationFactor(1)
            .withSchemaRegistry(true));
  }

}

You can retrieve the Schema Registry URL in your test configuration by calling getSchemaRegistryUrl() or schemaRegistryUrl.

Testing with a Docker Kafka cluster from Java code

The Alpakka Kafka testkit contains helper classes to start Kafka via Testcontainers. Alternatively, you may use just Testcontainers, as it is designed to be used with JUnit and you can follow their documentation to start and stop Kafka. To start a single instance for many tests see Singleton containers.

The Testcontainers dependency must be added to your project explicitly.

Maven
<dependencies>
  <dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.7</version>
    <scope>test</scope>
  </dependency>
</dependencies>
sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.19.7" % Test
Gradle
dependencies {
  testImplementation "org.testcontainers:kafka:1.19.7"
}

The example below shows skeleton test classes for JUnit 4 and JUnit 5. The Kafka broker will start before the first test and be stopped after all test classes are finished.

Java JUnit 4
sourceimport akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.Test;

public class AssignmentTest extends TestcontainersKafkaJunit4Test {

  private static final ActorSystem sys = ActorSystem.create("AssignmentTest");

  public AssignmentTest() {
    super(sys);
  }

  @AfterClass
  public static void afterClass() {
    TestKit.shutdownActorSystem(sys);
  }
}
Java JUnit 5
sourceimport akka.kafka.testkit.javadsl.TestcontainersKafkaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.extension.ExtendWith;

@TestInstance(Lifecycle.PER_CLASS)
class ProducerTest extends TestcontainersKafkaTest {

  private static final ActorSystem system = ActorSystem.create();

  ProducerTest() {
    super(system);
  }

  @AfterAll
  void shutdownActorSystem() {
    TestKit.shutdownActorSystem(system);
  }

}

Testing with a Docker Kafka cluster from Scala code

The Testcontainers dependency must be added to your project explicitly.

Maven
<dependencies>
  <dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.7</version>
    <scope>test</scope>
  </dependency>
</dependencies>
sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.19.7" % Test
Gradle
dependencies {
  testImplementation "org.testcontainers:kafka:1.19.7"
}

To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStoppedassertAllStagesStopped. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

One cluster for all tests

By mixing in TestcontainersKafkaLike the Kafka Docker cluster will be started before the first test and shut down after all tests are finished.

Scala
sourceimport akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.wordspec.AnyWordSpecLike
import org.scalatest.matchers.should.Matchers

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with AnyWordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually {

  protected def this() = this(kafkaPort = -1)
}

import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike

class TestcontainersSampleSpec extends SpecBase with TestcontainersKafkaLike {
  // ...
}

With this TestcontainersSampleSpec class test classes can extend it to automatically start and stop a Kafka broker to test with.

One cluster per test class

By mixing in TestcontainersKafkaPerClassLike a specific Kafka Docker cluster will be started for that test class and stopped after its run finished.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.