Testing

To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit. It provides help for

Project Info: Alpakka Kafka testkit
Artifact
com.typesafe.akka
akka-stream-kafka-testkit
7.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Eclipse Temurin JDK 21
Scala versions2.13.14, 3.3.4
JPMS module nameakka.stream.alpakka.kafka.testkit
License
Readiness level
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
Home pagehttps://doc.akka.io/libraries/alpakka-kafka/current
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka-kafka
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-kafka-testkit_${scala.binary.version}</artifactId>
    <version>7.0.0</version>
    <scope>test</scope>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream-testkit_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
    <scope>test</scope>
  </dependency>
</dependencies>
sbt
val AkkaVersion = "2.10.0"
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "7.0.0" % Test,
  "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test
)
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  ScalaBinary: "2.13"
]
dependencies {
  testImplementation "com.typesafe.akka:akka-stream-kafka-testkit_${versions.ScalaBinary}:7.0.0"
  testImplementation "com.typesafe.akka:akka-stream-testkit_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.

The table below shows Alpakka Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream-kafka_2.137.0.0
com.typesafe.akkaakka-stream-testkit_2.132.10.0
org.scala-langscala-library2.13.14
Dependency tree
com.typesafe.akka    akka-stream-kafka_2.13    7.0.0
    com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.14    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.14    Apache-2.0
    org.apache.kafka    kafka-clients    3.7.1    The Apache License, Version 2.0
        com.github.luben    zstd-jni    1.5.6-3    BSD 2-Clause License
        org.lz4    lz4-java    1.8.0    The Apache Software License, Version 2.0
        org.slf4j    slf4j-api    1.7.36
        org.xerial.snappy    snappy-java    1.1.10.5    Apache-2.0
    org.scala-lang    scala-library    2.13.14    Apache-2.0
com.typesafe.akka    akka-stream-testkit_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.14    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.14    Apache-2.0
    com.typesafe.akka    akka-testkit_2.13    2.10.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.14    Apache-2.0
        org.scala-lang    scala-library    2.13.14    Apache-2.0
    org.scala-lang    scala-library    2.13.14    Apache-2.0
org.scala-lang    scala-library    2.13.14    Apache-2.0

Running Kafka with your tests

The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster using Testcontainers (Docker).

The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers. You must mix in or implement these types into your test classes to use them. See the documentation for each for more details.

Type Test Framework Cluster Lang Lifetime
akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test JUnit 4 Yes Java All tests, Per class
akka.kafka.testkit.javadsl.TestcontainersKafkaTest JUnit 5 Yes Java All tests, Per class
akka.kafka.testkit.scaladsl.TestcontainersKafkaLike ScalaTest Yes Scala All tests
akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike ScalaTest Yes Scala Per class

Alternative testing libraries

If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.

Mocking the Consumer or Producer

The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory and Producer flows in akka.kafka.testkit.ProducerResultFactory.

To create the materialized value of Consumer sources, ConsumerControlFactoryConsumerControlFactory offers a wrapped KillSwitchKillSwitch.

Scala
sourceimport akka.kafka.testkit.scaladsl.ConsumerControlFactory
import akka.kafka.testkit.{ConsumerResultFactory, ProducerResultFactory}

// create elements emitted by the mocked Consumer
val elements = (0 to 10).map { i =>
  val nextOffset = startOffset + i
  ConsumerResultFactory.committableMessage(
    new ConsumerRecord(topic, partition, nextOffset, "key", s"value $i"),
    ConsumerResultFactory.committableOffset(groupId, topic, partition, nextOffset, s"metadata $i")
  )
}

// create a source imitating the Consumer.committableSource
val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] =
  Source(elements).viaMat(ConsumerControlFactory.controlFlow())(Keep.right)

// create a source imitating the Producer.flexiFlow
val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset],
                                  ProducerMessage.Results[String, String, CommittableOffset],
                                  NotUsed] =
  Flow[ProducerMessage.Envelope[String, String, CommittableOffset]]
    .map {
      case msg: ProducerMessage.Message[String, String, CommittableOffset] =>
        ProducerResultFactory.result(msg)
      case other => throw new Exception(s"excluded: $other")
    }

// run the flow as if it was connected to a Kafka broker
val (control, streamCompletion) = mockedKafkaConsumerSource
  .map(
    msg =>
      ProducerMessage.Message(
        new ProducerRecord[String, String](targetTopic, msg.record.value),
        msg.committableOffset
      )
  )
  .via(mockedKafkaProducerFlow)
  .map(_.passThrough)
  .toMat(Committer.sink(committerSettings))(Keep.both)
  .run()
Java
sourceimport akka.kafka.testkit.ConsumerResultFactory;
import akka.kafka.testkit.ProducerResultFactory;
import akka.kafka.testkit.javadsl.ConsumerControlFactory;

// create elements emitted by the mocked Consumer
List<ConsumerMessage.CommittableMessage<String, String>> elements =
    Arrays.asList(
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset, "metadata")),
        ConsumerResultFactory.committableMessage(
            new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"),
            ConsumerResultFactory.committableOffset(
                groupId, topic, partition, startOffset + 1, "metadata 2")));

// create a source imitating the Consumer.committableSource
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>
    mockedKafkaConsumerSource =
        Source.cycle(elements::iterator)
            .viaMat(ConsumerControlFactory.controlFlow(), Keep.right());

// create a source imitating the Producer.flexiFlow
Flow<
        ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>,
        ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>,
        NotUsed>
    mockedKafkaProducerFlow =
        Flow
            .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>>
                create()
            .map(
                msg -> {
                  if (msg instanceof ProducerMessage.Message) {
                    ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset>
                        msg2 =
                            (ProducerMessage.Message<
                                    String, String, ConsumerMessage.CommittableOffset>)
                                msg;
                    return ProducerResultFactory.result(msg2);
                  } else throw new RuntimeException("unexpected element: " + msg);
                });

// run the flow as if it was connected to a Kafka broker
Pair<Consumer.Control, CompletionStage<Done>> stream =
    mockedKafkaConsumerSource
        .map(
            msg -> {
              ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>
                  message =
                      new ProducerMessage.Message<>(
                          new ProducerRecord<>(
                              targetTopic, msg.record().key(), msg.record().value()),
                          msg.committableOffset());
              return message;
            })
        .via(mockedKafkaProducerFlow)
        .map(ProducerMessage.Results::passThrough)
        .toMat(Committer.sink(committerSettings), Keep.both())
        .run(sys);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.