Testing

To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit. It provides help for

Project Info: Alpakka Kafka testkit
Artifact
com.typesafe.akka
akka-stream-kafka-testkit
1.1.0
JDK versions
Adopt OpenJDK 8 with Hotspot
Adopt OpenJDK 11 with Hotspot
Scala versions2.12.9, 2.11.12, 2.13.0
JPMS module nameakka.stream.alpakka.kafka.testkit
License
Readiness level
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
Home pagehttps://doc.akka.io/docs/alpakka-kafka/current
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka-kafka
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-kafka-testkit_2.13</artifactId>
  <version>1.1.0</version>
</dependency>
sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.1.0"
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-stream-kafka-testkit_2.13', version: '1.1.0'
}

Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.

The table below shows Alpakka Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively. We’ve overriden the commons-compress library to use a version with fewer known security vulnerabilities.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream-kafka_2.131.1.0Apache-2.0
com.typesafe.akkaakka-stream-testkit_2.132.5.23Apache License, Version 2.0
org.scala-langscala-library2.13.0Apache-2.0
Dependency tree
com.typesafe.akka    akka-stream-kafka_2.13    1.1.0    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.5.23    Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.5.23    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.0    Apache-2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        com.typesafe.akka    akka-protobuf_2.13    2.5.23    Apache License, Version 2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.0    Apache-2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.0    Apache-2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.2    CC0
        org.scala-lang    scala-library    2.13.0    Apache-2.0
    org.apache.kafka    kafka-clients    2.1.1    The Apache Software License, Version 2.0
        com.github.luben    zstd-jni    1.3.7-1    BSD 2-Clause License
        org.lz4    lz4-java    1.5.0    The Apache Software License, Version 2.0
        org.slf4j    slf4j-api    1.7.25    MIT License
        org.xerial.snappy    snappy-java    1.1.7.2    The Apache Software License, Version 2.0
    org.scala-lang.modules    scala-collection-compat_2.13    2.1.1    Apache-2.0
        org.scala-lang    scala-library    2.13.0    Apache-2.0
    org.scala-lang    scala-library    2.13.0    Apache-2.0
com.typesafe.akka    akka-stream-testkit_2.13    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-stream_2.13    2.5.23    Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.5.23    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.0    Apache-2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        com.typesafe.akka    akka-protobuf_2.13    2.5.23    Apache License, Version 2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.0    Apache-2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.0    Apache-2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.2    CC0
        org.scala-lang    scala-library    2.13.0    Apache-2.0
    com.typesafe.akka    akka-testkit_2.13    2.5.23    Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.5.23    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0    Apache-2.0
                org.scala-lang    scala-library    2.13.0    Apache-2.0
            org.scala-lang    scala-library    2.13.0    Apache-2.0
        org.scala-lang    scala-library    2.13.0    Apache-2.0
    org.scala-lang    scala-library    2.13.0    Apache-2.0
org.scala-lang    scala-library    2.13.0    Apache-2.0

Mocking the Consumer or Producer

The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory and Producer flows in akka.kafka.testkit.ProducerResultFactory.

To create the materialized value of Consumer sources, akka.kafka.testkit.scaladsl.ConsumerControlFactoryakka.kafka.testkit.javadsl.ConsumerControlFactory offers a wrapped KillSwitch.

Scala
import akka.kafka.testkit.scaladsl.ConsumerControlFactory
import akka.kafka.testkit.{ConsumerResultFactory, ProducerResultFactory}

// create elements emitted by the mocked Consumer
val elements = immutable.Seq(
  ConsumerResultFactory.committableMessage(
    new ConsumerRecord(topic, partition, startOffset, "key", "value 1"),
    ConsumerResultFactory.committableOffset(groupId, topic, partition, startOffset, "metadata")
  ),
  ConsumerResultFactory.committableMessage(
    new ConsumerRecord(topic, partition, startOffset + 1, "key", "value 2"),
    ConsumerResultFactory.committableOffset(groupId, topic, partition, startOffset + 1, "metadata 2")
  )
)

// create a source imitating the Consumer.committableSource
val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] =
  Source
    .cycle(() => elements.iterator)
    .viaMat(ConsumerControlFactory.controlFlow())(Keep.right)

// create a source imitating the Producer.flexiFlow
val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset],
                                  ProducerMessage.Results[String, String, CommittableOffset],
                                  NotUsed] =
  Flow[ProducerMessage.Envelope[String, String, CommittableOffset]]
    .map {
      case msg: ProducerMessage.Message[String, String, CommittableOffset] =>
        ProducerResultFactory.result(msg)
      case other => throw new Exception(s"excluded: $other")
    }

// run the flow as if it was connected to a Kafka broker
val (control, streamCompletion) = mockedKafkaConsumerSource
  .map(
    msg =>
      ProducerMessage.Message(
        new ProducerRecord[String, String](targetTopic, msg.record.value),
        msg.committableOffset
      )
  )
  .via(mockedKafkaProducerFlow)
  .map(_.passThrough)
  .toMat(Committer.sink(committerSettings))(Keep.both)
  .run()
Java
import akka.kafka.testkit.ConsumerResultFactory;
import akka.kafka.testkit.ProducerResultFactory;
import akka.kafka.testkit.javadsl.ConsumerControlFactory;

// create elements emitted by the mocked Consumer
List<ConsumerMessage.CommittableMessage<String, String>> elements =
    Arrays.asList(
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset, "metadata")),
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset + 1, "metadata 2")));

// create a source imitating the Consumer.committableSource
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>
    mockedKafkaConsumerSource =
        Source.cycle(elements::iterator)
            .viaMat(ConsumerControlFactory.controlFlow(), Keep.right());

// create a source imitating the Producer.flexiFlow
Flow<
        ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>,
        ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>,
        NotUsed>
    mockedKafkaProducerFlow =
        Flow
            .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>>
                create()
            .map(
                msg -> {
                  if (msg instanceof ProducerMessage.Message) {
                    ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset>
                        msg2 =
                            (ProducerMessage.Message<
                                    String, String, ConsumerMessage.CommittableOffset>)
                                msg;
                    return ProducerResultFactory.result(msg2);
                  } else throw new RuntimeException("unexpected element: " + msg);
                });

// run the flow as if it was connected to a Kafka broker
Pair<Consumer.Control, CompletionStage<Done>> stream =
    mockedKafkaConsumerSource
        .map(
            msg -> {
              ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>
                  message =
                      new ProducerMessage.Message<>(
                          new ProducerRecord<>(
                              targetTopic, msg.record().key(), msg.record().value()),
                          msg.committableOffset());
              return message;
            })
        .via(mockedKafkaProducerFlow)
        .map(ProducerMessage.Results::passThrough)
        .toMat(Committer.sink(committerSettings), Keep.both())
        .run(mat);

Testing with an embedded Kafka server

To test the Alpakka Kafka connector the Embedded Kafka library is an important tool as it helps to easily start and stop Kafka brokers from test cases.

Note

As Kafka uses Scala internally, only the Scala versions supported by Kafka can be used together with Embedded Kafka. To be independent of Kafka’s supported Scala versions, run Kafka in a Docker container.

The helpers for running Embedded Kafka are available for Scala 2.11 and 2.12.

The testkit contains helper classes used by the tests in the Alpakka Kafka connector and may be used for other testing, as well.

Testing with Avro and Schema Registry

If you need to run tests using Confluent’s Schema Registry, you might include embedded-kafka-schema-registry instead.

Testing with Avro and Schema Registry from Java code

Test classes may extend akka.kafka.testkit.javadsl.EmbeddedKafkaTest (JUnit 5) or akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test (JUnit 4) to automatically start and stop an embedded Kafka broker.

Furthermore it provides

  • preconfigured consumer settings (ConsumerSettings<String, String> consumerDefaults),
  • preconfigured producer settings (ProducerSettings<String, String> producerDefaults),
  • unique topic creation (createTopic(int number, int partitions, int replication)), and
  • CompletionStage value extraction helper (<T> T resultOf(CompletionStage<T> stage, java.time.Duration timeout)).

The example below shows skeleton test classes for JUnit 4 and JUnit 5.

Java JUnit 4
import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.Test;

public class AssignmentTest extends EmbeddedKafkaJunit4Test {

  private static final ActorSystem sys = ActorSystem.create("AssignmentTest");
  private static final Materializer mat = ActorMaterializer.create(sys);

  public AssignmentTest() {
    super(sys, mat, KafkaPorts.AssignmentTest());
  }

  @AfterClass
  public static void afterClass() {
    TestKit.shutdownActorSystem(sys);
  }
}
Java JUnit 5
import akka.kafka.testkit.javadsl.EmbeddedKafkaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;

@TestInstance(Lifecycle.PER_CLASS)
class ProducerExampleTest extends EmbeddedKafkaTest {

  private static final ActorSystem system = ActorSystem.create("ProducerExampleTest");
  private static final Materializer materializer = ActorMaterializer.create(system);

  ProducerExampleTest() {
    super(system, materializer, KafkaPorts.ProducerExamplesTest());
  }

  @AfterAll
  void shutdownActorSystem() {
    TestKit.shutdownActorSystem(system);
  }

}

The JUnit test base classes run the assertAllStagesStopped check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

Testing with Avro and Schema Registry from Scala code

The KafkaSpec class offers access to

  • preconfigured consumer settings (consumerDefaults: ConsumerSettings[String, String]),
  • preconfigured producer settings (producerDefaults: ProducerSettings[String, String]),
  • unique topic creation (createTopic(number: Int = 0, partitions: Int = 1, replication: Int = 1)),
  • an implicit LoggingAdapter for use with the log() operator, and
  • other goodies.

EmbeddedKafkaLike extends KafkaSpec to add automatic starting and stopping of the embedded Kafka broker.

Some Alpakka Kafka tests implemented in Scala use Scalatest with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest 3.0.8.)

Maven
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>3.0.8</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "org.scalatest" % "scalatest" % "3.0.8" % Test
Gradle
dependencies {
  test group: 'org.scalatest', name: 'scalatest', version: '3.0.8'
}
Scala
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.{Matchers, WordSpecLike}

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with WordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually {

  protected def this() = this(kafkaPort = -1)
}

By mixing in EmbeddedKafkaLike an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.

Scala
import akka.kafka.testkit.scaladsl.EmbeddedKafkaLike
import net.manub.embeddedkafka.EmbeddedKafkaConfig

class EmbeddedKafkaSampleSpec extends SpecBase(kafkaPort = 1234) with EmbeddedKafkaLike {

  // if a specific Kafka broker configuration is desired
  override def createKafkaConfig: EmbeddedKafkaConfig =
    EmbeddedKafkaConfig(kafkaPort,
                        zooKeeperPort,
                        Map(
                          "offsets.topic.replication.factor" -> "1",
                          "offsets.retention.minutes" -> "1",
                          "offsets.retention.check.interval.ms" -> "100"
                        ))

  // ...
}

With this EmbeddedKafkaSpecBase class test classes can extend it to automatically start and stop a Kafka broker to test with. To configure the Kafka broker non-default, override the createKafkaConfig as shown above.

To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

Testing with Kafka in Docker

The Testcontainers project contains a nice API to start and stop Apache Kafka in Docker containers. This becomes very relevant when your application code uses a Scala version which Apache Kafka doesn’t support so that EmbeddedKafka can’t be used.

Note

The Testcontainers support is new to Alpakka Kafka since 1.0.2 and may evolve a bit more.

Testing with Kafka in Docker from Java code

The Alpakka Kafka testkit contains helper classes to start Kafka via Testcontainers. Alternatively, you may use just Testcontainers, as it is designed to be used with JUnit and you can follow their documentation to start and stop Kafka. To start a single instance for many tests see Singleton containers.

The Testcontainers dependency must be added to your project explicitly.

Maven
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.11.2</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.11.2" % Test
Gradle
dependencies {
  test group: 'org.testcontainers', name: 'kafka', version: '1.11.2'
}

The example below shows skeleton test classes for JUnit 4 and JUnit 5. The Kafka broker will start before the first test and be stopped after all test classes are finished.

Java JUnit 4
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.Test;

public class AssignmentWithTestcontainersTest extends TestcontainersKafkaJunit4Test {

  private static final ActorSystem sys = ActorSystem.create("AssignmentTest");
  private static final Materializer mat = ActorMaterializer.create(sys);

  public AssignmentWithTestcontainersTest() {
    super(sys, mat);
  }

  @AfterClass
  public static void afterClass() {
    TestKit.shutdownActorSystem(sys);
  }
}
Java JUnit 5
import akka.kafka.testkit.javadsl.TestcontainersKafkaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;

@TestInstance(Lifecycle.PER_CLASS)
class ProducerWithTestcontainersTest extends TestcontainersKafkaTest {

  private static final ActorSystem system = ActorSystem.create();
  private static final Materializer materializer = ActorMaterializer.create(system);

  ProducerWithTestcontainersTest() {
    super(system, materializer);
  }

  @AfterAll
  void shutdownActorSystem() {
    TestKit.shutdownActorSystem(system);
  }

}

Testing with Kafka in Docker from Scala code

The Testcontainers dependency must be added to your project explicitly.

Maven
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.11.2</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "org.testcontainers" % "kafka" % "1.11.2" % Test
Gradle
dependencies {
  test group: 'org.testcontainers', name: 'kafka', version: '1.11.2'
}

By mixing in TestcontainersKafkaLike the Kafka Docker container will be started before the first test and shut down after all tests are finished.

Scala
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.{Matchers, WordSpecLike}

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with WordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually {

  protected def this() = this(kafkaPort = -1)
}

import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike

class TestcontainersSampleSpec extends SpecBase with TestcontainersKafkaLike {
  // ...
}

With this TestcontainersSampleSpec class test classes can extend it to automatically start and stop a Kafka broker to test with.

To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

Alternative testing libraries

If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.