Testing

To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit. It provides help for

Project Info: Alpakka Kafka testkit
Artifact
com.typesafe.akka
akka-stream-kafka-testkit
2.0.3
JDK versions
Adopt OpenJDK 8 with Hotspot
Adopt OpenJDK 11 with Hotspot
Scala versions2.12.10, 2.11.12, 2.13.1
JPMS module nameakka.stream.alpakka.kafka.testkit
License
Readiness level
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
Home pagehttps://doc.akka.io/docs/alpakka-kafka/current
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka-kafka
Maven
<properties>
  <akka.version>2.5.30</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-kafka-testkit_${scala.binary.version}</artifactId>
  <version>2.0.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream-testkit_${scala.binary.version}</artifactId>
  <version>${akka.version}</version>
  <scope>test</scope>
</dependency>
sbt
val AkkaVersion = "2.5.30"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "2.0.3" % Test,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test
)
Gradle
versions += [
  AkkaVersion: "2.5.30",
  ScalaBinary: "2.13"
]
dependencies {
  test group: 'com.typesafe.akka', name: "akka-stream-kafka-testkit_${versions.ScalaBinary}", version: '2.0.3',
  test group: 'com.typesafe.akka', name: "akka-stream-testkit_${versions.ScalaBinary}", version: versions.AkkaVersion
}

Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.

The table below shows Alpakka Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively. We’ve overriden the commons-compress library to use a version with fewer known security vulnerabilities.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream-kafka_2.132.0.3
com.typesafe.akkaakka-stream-testkit_2.132.5.30
org.scala-langscala-library2.13.1
Dependency tree
com.typesafe.akka    akka-stream-kafka_2.13    2.0.3
    com.typesafe.akka    akka-stream_2.13    2.5.30
        com.typesafe.akka    akka-actor_2.13    2.5.30
            com.typesafe    config    1.3.3
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0
        com.typesafe.akka    akka-protobuf_2.13    2.5.30
        com.typesafe    ssl-config-core_2.13    0.3.8
            com.typesafe    config    1.3.3
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2
        org.reactivestreams    reactive-streams    1.0.2
    org.apache.kafka    kafka-clients    2.4.1
        com.github.luben    zstd-jni    1.4.3-1
        org.lz4    lz4-java    1.6.0
        org.slf4j    slf4j-api    1.7.29
        org.xerial.snappy    snappy-java    1.1.7.3
    org.scala-lang.modules    scala-collection-compat_2.13    2.1.2
com.typesafe.akka    akka-stream-testkit_2.13    2.5.30
    com.typesafe.akka    akka-stream_2.13    2.5.30
        com.typesafe.akka    akka-actor_2.13    2.5.30
            com.typesafe    config    1.3.3
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0
        com.typesafe.akka    akka-protobuf_2.13    2.5.30
        com.typesafe    ssl-config-core_2.13    0.3.8
            com.typesafe    config    1.3.3
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2
        org.reactivestreams    reactive-streams    1.0.2
    com.typesafe.akka    akka-testkit_2.13    2.5.30
        com.typesafe.akka    akka-actor_2.13    2.5.30
            com.typesafe    config    1.3.3
            org.scala-lang.modules    scala-java8-compat_2.13    0.9.0
org.scala-lang    scala-library    2.13.1

Running Kafka with your tests

The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster. There are two main options:

  1. Embedded Kafka
  2. Testcontainers (Docker)

The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers. You must mix in or implement these types into your test classes to use them. See the documentation for each for more details.

Type Test Framework Runtime Mode Cluster Schema Registry Lang Lifetime
akka.kafka.testkit.javadsl.EmbeddedKafkaTest JUnit 5 Embedded Kafka No Yes Java All tests, Per class
akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test JUnit 4 Embedded Kafka No Yes Java All tests, Per class
akka.kafka.testkit.scaladsl.EmbeddedKafkaLike ScalaTest Embedded Kafka No Yes Scala Per class
akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test JUnit 5 Testcontainers Yes No Java All tests, Per class
akka.kafka.testkit.javadsl.TestcontainersKafkaTest JUnit 4 Testcontainers Yes No Java All tests, Per class
akka.kafka.testkit.scaladsl.TestcontainersKafkaLike ScalaTest Testcontainers Yes No Scala All tests
akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike ScalaTest Testcontainers Yes No Scala Per class

Testing with an embedded Kafka server

To test the Alpakka Kafka connector the Embedded Kafka library is an important tool as it helps to easily start and stop Kafka brokers from test cases.

Add the Embedded Kafka to your test dependencies:

Maven
<dependency>
  <groupId>io.github.embeddedkafka</groupId>
  <artifactId>embedded-kafka_2.12</artifactId>
  <version>2.4.1</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "io.github.embeddedkafka" % "embedded-kafka_2.12" % "2.4.1" % Test
Gradle
dependencies {
  test group: 'io.github.embeddedkafka', name: 'embedded-kafka_2.12', version: '2.4.1'
}
Note

As Kafka uses Scala internally, only the Scala versions supported by Kafka can be used together with Embedded Kafka. To be independent of Kafka’s supported Scala versions, run Kafka in a Docker container.

The helpers for running Embedded Kafka are available for Scala 2.11 and 2.12.

The testkit contains helper classes used by the tests in the Alpakka Kafka connector and may be used for other testing, as well.

Testing with Avro and Schema Registry

If you need to run tests using Confluent’s Schema Registry, you might include embedded-kafka-schema-registry instead.

Testing with Avro and Schema Registry from Java code

Test classes may extend EmbeddedKafkaTest (JUnit 5) or EmbeddedKafkaJunit4Test (JUnit 4) to automatically start and stop an embedded Kafka broker.

Furthermore it provides

  • preconfigured consumer settings (ConsumerSettings<String, String> consumerDefaults),
  • preconfigured producer settings (ProducerSettings<String, String> producerDefaults),
  • unique topic creation (createTopic(int number, int partitions, int replication)), and
  • CompletionStage value extraction helper (<T> T resultOf(CompletionStage<T> stage, java.time.Duration timeout)).

The example below shows skeleton test classes for JUnit 4 and JUnit 5.

Java JUnit 4
import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;

public class AssignmentTest extends EmbeddedKafkaJunit4Test {

  @Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();

  private static final ActorSystem sys = ActorSystem.create("AssignmentTest");
  private static final Materializer mat = ActorMaterializer.create(sys);

  public AssignmentTest() {
    super(sys, mat, KafkaPorts.AssignmentTest());
  }

  @AfterClass
  public static void afterClass() {
    TestKit.shutdownActorSystem(sys);
  }
}
Java JUnit 5
import akka.kafka.testkit.javadsl.EmbeddedKafkaTest;
import akka.testkit.javadsl.TestKit;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.extension.ExtendWith;

@TestInstance(Lifecycle.PER_CLASS)
class ProducerExampleTest extends EmbeddedKafkaTest {

  private static final ActorSystem system = ActorSystem.create("ProducerExampleTest");
  private static final Materializer materializer = ActorMaterializer.create(system);

  ProducerExampleTest() {
    super(system, materializer, KafkaPorts.ProducerExamplesTest());
  }

  @AfterAll
  void shutdownActorSystem() {
    TestKit.shutdownActorSystem(system);
    executor.shutdown();
  }

}

The JUnit test base classes run the assertAllStagesStopped check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

Testing with Avro and Schema Registry from Scala code

The KafkaSpec class offers access to

  • preconfigured consumer settings (consumerDefaults: ConsumerSettings[String, String]),
  • preconfigured producer settings (producerDefaults: ProducerSettings[String, String]),
  • unique topic creation (createTopic(number: Int = 0, partitions: Int = 1, replication: Int = 1)),
  • an implicit LoggingAdapter for use with the log() operator, and
  • other goodies.

EmbeddedKafkaLike extends KafkaSpec to add automatic starting and stopping of the embedded Kafka broker.

Some Alpakka Kafka tests implemented in Scala use Scalatest with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest 3.0.8.)

Maven
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>3.0.8</version>
  <scope>test</scope>
</dependency>
sbt
libraryDependencies += "org.scalatest" % "scalatest" % "3.0.8" % Test
Gradle
dependencies {
  test group: 'org.scalatest', name: 'scalatest', version: '3.0.8'
}
Scala
import akka.kafka.Repeated
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import akka.kafka.tests.scaladsl.LogCapturing
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
import org.scalatest.{Matchers, WordSpecLike}

abstract class SpecBase(kafkaPort: Int)
    extends ScalatestKafkaSpec(kafkaPort)
    with WordSpecLike
    with Matchers
    with ScalaFutures
    with Eventually
    with LogCapturing
    with Repeated {

  protected def this() = this(kafkaPort = -1)
}

By mixing in EmbeddedKafkaLike an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.

Scala
import akka.kafka.testkit.scaladsl.EmbeddedKafkaLike
import net.manub.embeddedkafka.EmbeddedKafkaConfig

class EmbeddedKafkaSampleSpec extends SpecBase(kafkaPort = 1234) with EmbeddedKafkaLike {

  // if a specific Kafka broker configuration is desired
  override def createKafkaConfig: EmbeddedKafkaConfig =
    EmbeddedKafkaConfig(kafkaPort,
                        zooKeeperPort,
                        Map(
                          "offsets.topic.replication.factor" -> "1",
                          "offsets.retention.minutes" -> "1",
                          "offsets.retention.check.interval.ms" -> "100"
                        ))

  // ...
}

With this EmbeddedKafkaSpecBase class test classes can extend it to automatically start and stop a Kafka broker to test with. To configure the Kafka broker non-default, override the createKafkaConfig as shown above.

To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped. This may interfere with the stop-timeout which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf for tests.

Alternative testing libraries

If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.

Mocking the Consumer or Producer

The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory and Producer flows in akka.kafka.testkit.ProducerResultFactory.

To create the materialized value of Consumer sources, ConsumerControlFactoryConsumerControlFactory offers a wrapped KillSwitchKillSwitch.

Scala
import akka.kafka.testkit.scaladsl.ConsumerControlFactory
import akka.kafka.testkit.{ConsumerResultFactory, ProducerResultFactory}

// create elements emitted by the mocked Consumer
val elements = immutable.Seq(
  ConsumerResultFactory.committableMessage(
    new ConsumerRecord(topic, partition, startOffset, "key", "value 1"),
    ConsumerResultFactory.committableOffset(groupId, topic, partition, startOffset, "metadata")
  ),
  ConsumerResultFactory.committableMessage(
    new ConsumerRecord(topic, partition, startOffset + 1, "key", "value 2"),
    ConsumerResultFactory.committableOffset(groupId, topic, partition, startOffset + 1, "metadata 2")
  )
)

// create a source imitating the Consumer.committableSource
val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] =
  Source
    .cycle(() => elements.iterator)
    .viaMat(ConsumerControlFactory.controlFlow())(Keep.right)

// create a source imitating the Producer.flexiFlow
val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset],
                                  ProducerMessage.Results[String, String, CommittableOffset],
                                  NotUsed] =
  Flow[ProducerMessage.Envelope[String, String, CommittableOffset]]
    .map {
      case msg: ProducerMessage.Message[String, String, CommittableOffset] =>
        ProducerResultFactory.result(msg)
      case other => throw new Exception(s"excluded: $other")
    }

// run the flow as if it was connected to a Kafka broker
val (control, streamCompletion) = mockedKafkaConsumerSource
  .map(
    msg =>
      ProducerMessage.Message(
        new ProducerRecord[String, String](targetTopic, msg.record.value),
        msg.committableOffset
      )
  )
  .via(mockedKafkaProducerFlow)
  .map(_.passThrough)
  .toMat(Committer.sink(committerSettings))(Keep.both)
  .run()
Java
import akka.kafka.testkit.ConsumerResultFactory;
import akka.kafka.testkit.ProducerResultFactory;
import akka.kafka.testkit.javadsl.ConsumerControlFactory;

// create elements emitted by the mocked Consumer
List<ConsumerMessage.CommittableMessage<String, String>> elements =
    Arrays.asList(
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset, "metadata")),
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset + 1, "metadata 2")));

// create a source imitating the Consumer.committableSource
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>
    mockedKafkaConsumerSource =
        Source.cycle(elements::iterator)
            .viaMat(ConsumerControlFactory.controlFlow(), Keep.right());

// create a source imitating the Producer.flexiFlow
Flow<
        ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>,
        ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>,
        NotUsed>
    mockedKafkaProducerFlow =
        Flow
            .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>>
                create()
            .map(
                msg -> {
                  if (msg instanceof ProducerMessage.Message) {
                    ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset>
                        msg2 =
                            (ProducerMessage.Message<
                                    String, String, ConsumerMessage.CommittableOffset>)
                                msg;
                    return ProducerResultFactory.result(msg2);
                  } else throw new RuntimeException("unexpected element: " + msg);
                });

// run the flow as if it was connected to a Kafka broker
Pair<Consumer.Control, CompletionStage<Done>> stream =
    mockedKafkaConsumerSource
        .map(
            msg -> {
              ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>
                  message =
                      new ProducerMessage.Message<>(
                          new ProducerRecord<>(
                              targetTopic, msg.record().key(), msg.record().value()),
                          msg.committableOffset());
              return message;
            })
        .via(mockedKafkaProducerFlow)
        .map(ProducerMessage.Results::passThrough)
        .toMat(Committer.sink(committerSettings), Keep.both())
        .run(mat);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.