Testing with a Docker Kafka cluster

The Testcontainers project contains a nice API to start and stop Apache Kafka in Docker containers. This becomes very relevant when your application code uses a Scala version which Apache Kafka doesn’t support so that EmbeddedKafka can’t be used. Testcontainers also allow you to create a complete Kafka cluster (using Docker containers) to simulate production use cases.

Settings

You can override testcontainers settings to create multi-broker Kafka clusters, or to finetune Kafka Broker and ZooKeeper configuration, by updating KafkaTestkitTestcontainersSettingsKafkaTestkitTestcontainersSettings in code or configuration. The KafkaTestkitTestcontainersSettingsKafkaTestkitTestcontainersSettings type can be used to perform actions such as:

  • Set the version of Confluent Platform docker images to use
  • Define number of Kafka brokers
  • Overriding container settings and environment variables (i.e. to change default Broker config)
  • Apply custom docker configuration to the Kafka and ZooKeeper containers used to create a cluster

To change defaults for all settings update the appropriate configuration in akka.kafka.testkit.testcontainers.

akka.kafka.testkit.testcontainers {

  # define this to select a different Kafka version by choosing the desired version of Confluent Platform
  # available Docker images: https://hub.docker.com/r/confluentinc/cp-kafka/tags
  # Kafka versions in Confluent Platform: https://docs.confluent.io/current/installation/versions-interoperability.html
  confluent-platform-version = "5.4.0"

  # the number of Kafka brokers to include in a test cluster
  num-brokers = 1

  # set this to use a replication factor for internal Kafka topics such as Consumer Offsets and Transaction log.
  # this replication factor must be less than or equal to `num-brokers`
  internal-topics-replication-factor = 1

  # set this to true to use launch a testcontainer for Confluent Schema Registry
  use-schema-registry = false
}

You can override all these defaults in code and per test class. By applying settings in code you can also configure the Kafka and ZooKeeper containers themselves.

For example, the following demonstrates creating a 3 Broker Kafka cluster and disables the automatic topic creation broker configuration using environment variables.

Scala
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{Matchers, WordSpecLike}

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with WordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually {

  protected def this() = this(kafkaPort = -1)
}

import akka.kafka.testkit.KafkaTestkitTestcontainersSettings
import akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike

class TestcontainersNewSettingsSampleSpec extends SpecBase with TestcontainersKafkaPerClassLike {

  override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system)
    .withNumBrokers(3)
    .withInternalTopicsReplicationFactor(2)
    .withConfigureKafka { brokerContainers =>
      brokerContainers.foreach(_.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false"))
    }

  // ...
}
Java
import akka.actor.ActorSystem;
import akka.kafka.testkit.KafkaTestkitTestcontainersSettings;
import akka.kafka.testkit.javadsl.TestcontainersKafkaTest;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.TestInstance;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TestkitTestcontainersTest extends TestcontainersKafkaTest {

  private static final ActorSystem system = ActorSystem.create("TestkitTestcontainersTest");
  private static final Materializer materializer = ActorMaterializer.create(system);

  private static KafkaTestkitTestcontainersSettings testcontainersSettings =
      KafkaTestkitTestcontainersSettings.create(system)
          .withNumBrokers(3)
          .withInternalTopicsReplicationFactor(2)
          .withConfigureKafkaConsumer(
              brokerContainers ->
                  brokerContainers.forEach(
                      b -> b.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")));

  TestkitTestcontainersTest() {
    // this will only start a new cluster if it has not already been started.
    //
    // you must stop the cluster in the afterClass implementation if you want to create a cluster
    // per test class
    // using (TestInstance.Lifecycle.PER_CLASS)
    super(system, materializer, testcontainersSettings);
  }

  // ...

  // omit this implementation if you want the cluster to stay up for all your tests
  @AfterAll
  void afterClass() {
    TestcontainersKafkaTest.stopKafka();
  }
}

To see what options are available for configuring testcontainers using configureKafka and configureZooKeeper in KafkaTestkitTestcontainersSettingsKafkaTestkitTestcontainersSettings see the API docs for AlpakkaKafkaContainerAlpakkaKafkaContainer and GenericContainer.

Testing with Schema Registry

If you use Confluent’s Schema Registry in your client code you may optionally run a Schema Registry container as well. The following snippet demonstrates overriding a test class to use the container, or you can enable it for all tests in your application.conf.

Scala
class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKafkaPerClassLike {

  override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system).withSchemaRegistry(true)

  // tests..
}
Java
public class SchemaRegistrySerializationTest extends TestcontainersKafkaJunit4Test {

  private static final ActorSystem sys = ActorSystem.create("SchemaRegistrySerializationTest");
  private static final Materializer mat = ActorMaterializer.create(sys);
  private static final Executor ec = Executors.newSingleThreadExecutor();

  public SchemaRegistrySerializationTest() {
    super(
        sys,
        mat,
        KafkaTestkitTestcontainersSettings.create(sys)
            .withInternalTopicsReplicationFactor(1)
            .withSchemaRegistry(true));
  }

}

You can retrieve the Schema Registry URL in your test configuration by calling getSchemaRegistryUrl() or schemaRegistryUrl.

Testing with a Docker Kafka cluster from Java code

The Alpakka Kafka testkit contains helper classes to start Kafka via Testcontainers. Alternatively, you may use just Testcontainers, as it is designed to be used with JUnit and you can follow their documentation to start and stop Kafka. To start a single instance for many tests see Singleton containers.

The Testcontainers dependency must be added to your project explicitly.

Maven
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.14.3</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.14.3" % Test
Gradle
dependencies {
  test group: 'org.testcontainers', name: 'kafka', version: '1.14.3'
}

The example below shows skeleton test classes for JUnit 4 and JUnit 5. The Kafka broker will start before the first test and be stopped after all test classes are finished.

Java JUnit 4
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.Test;

public class AssignmentWithTestcontainersTest extends TestcontainersKafkaJunit4Test {

  private static final ActorSystem sys = ActorSystem.create("AssignmentTest");
  private static final Materializer mat = ActorMaterializer.create(sys);

  public AssignmentWithTestcontainersTest() {
    super(sys, mat);
  }

  @AfterClass
  public static void afterClass() {
    TestKit.shutdownActorSystem(sys);
  }
}
Java JUnit 5
import akka.kafka.testkit.javadsl.TestcontainersKafkaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.extension.ExtendWith;

@TestInstance(Lifecycle.PER_CLASS)
class ProducerWithTestcontainersTest extends TestcontainersKafkaTest {

  private static final ActorSystem system = ActorSystem.create();
  private static final Materializer materializer = ActorMaterializer.create(system);

  ProducerWithTestcontainersTest() {
    super(system, materializer);
  }

  @AfterAll
  void shutdownActorSystem() {
    TestKit.shutdownActorSystem(system);
  }

}

Testing with a Docker Kafka cluster from Scala code

The Testcontainers dependency must be added to your project explicitly.

Maven
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.14.3</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.14.3" % Test
Gradle
dependencies {
  test group: 'org.testcontainers', name: 'kafka', version: '1.14.3'
}

To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStoppedassertAllStagesStopped. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

One cluster for all tests

By mixing in TestcontainersKafkaLike the Kafka Docker cluster will be started before the first test and shut down after all tests are finished.

Scala
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{Matchers, WordSpecLike}

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with WordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually {

  protected def this() = this(kafkaPort = -1)
}

import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike

class TestcontainersSampleSpec extends SpecBase with TestcontainersKafkaLike {
  // ...
}

With this TestcontainersSampleSpec class test classes can extend it to automatically start and stop a Kafka broker to test with.

One cluster per test class

By mixing in TestcontainersKafkaPerClassLike a specific Kafka Docker cluster will be started for that test class and stopped after its run finished.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.