Testing
To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit. It provides help for
- Using Docker to launch a local Kafka cluster with testcontainers
- Mocking the Alpakka Kafka Consumers and Producers
Project Info: Alpakka Kafka testkit | |
---|---|
Artifact | com.typesafe.akka
akka-stream-kafka-testkit
2.1.1
|
JDK versions | Adopt OpenJDK 8 with Hotspot Adopt OpenJDK 11 with Hotspot |
Scala versions | 2.12.13, 2.13.4 |
JPMS module name | akka.stream.alpakka.kafka.testkit |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
|
Home page | https://doc.akka.io/docs/alpakka-kafka/current |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka-kafka |
- Maven
<properties> <akka.version>2.6.15</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-kafka-testkit_${scala.binary.version}</artifactId> <version>2.1.1</version> <scope>test</scope> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-testkit_${scala.binary.version}</artifactId> <version>${akka.version}</version> <scope>test</scope> </dependency> </dependencies>
- sbt
- Gradle
Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.
The table below shows Alpakka Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream-kafka_2.13 2.1.1 com.typesafe.akka akka-stream-testkit_2.13 2.6.15 org.scala-lang scala-library 2.13.2 - Dependency tree
Running Kafka with your tests
The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster using Testcontainers (Docker).
The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers. You must mix in or implement these types into your test classes to use them. See the documentation for each for more details.
Type | Test Framework | Cluster | Lang | Lifetime |
---|---|---|---|---|
akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test |
JUnit 5 | Yes | Java | All tests, Per class |
akka.kafka.testkit.javadsl.TestcontainersKafkaTest |
JUnit 4 | Yes | Java | All tests, Per class |
akka.kafka.testkit.scaladsl.TestcontainersKafkaLike |
ScalaTest | Yes | Scala | All tests |
akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike |
ScalaTest | Yes | Scala | Per class |
Alternative testing libraries
If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.
Mocking the Consumer or Producer
The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory
and Producer flows in akka.kafka.testkit.ProducerResultFactory
.
To create the materialized value of Consumer sources, ConsumerControlFactory
offers a wrapped KillSwitch
.
- Scala
-
source
import akka.kafka.testkit.scaladsl.ConsumerControlFactory import akka.kafka.testkit.{ConsumerResultFactory, ProducerResultFactory} // create elements emitted by the mocked Consumer val elements = (0 to 10).map { i => val nextOffset = startOffset + i ConsumerResultFactory.committableMessage( new ConsumerRecord(topic, partition, nextOffset, "key", s"value $i"), ConsumerResultFactory.committableOffset(groupId, topic, partition, nextOffset, s"metadata $i") ) } // create a source imitating the Consumer.committableSource val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = Source(elements).viaMat(ConsumerControlFactory.controlFlow())(Keep.right) // create a source imitating the Producer.flexiFlow val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset], ProducerMessage.Results[String, String, CommittableOffset], NotUsed] = Flow[ProducerMessage.Envelope[String, String, CommittableOffset]] .map { case msg: ProducerMessage.Message[String, String, CommittableOffset] => ProducerResultFactory.result(msg) case other => throw new Exception(s"excluded: $other") } // run the flow as if it was connected to a Kafka broker val (control, streamCompletion) = mockedKafkaConsumerSource .map( msg => ProducerMessage.Message( new ProducerRecord[String, String](targetTopic, msg.record.value), msg.committableOffset ) ) .via(mockedKafkaProducerFlow) .map(_.passThrough) .toMat(Committer.sink(committerSettings))(Keep.both) .run()
- Java