Testing
To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit. It provides help for
- mocking the Alpakka Kafka Consumers and Producers
- using an embedded Kafka broker
- starting and stopping Kafka in Docker
Project Info: Alpakka Kafka testkit | |
---|---|
Artifact | com.typesafe.akka
akka-stream-kafka-testkit
1.0.5
|
JDK versions | Adopt OpenJDK 8 with Hotspot Adopt OpenJDK 11 with Hotspot |
Scala versions | 2.12.8, 2.11.12, 2.13.0 |
JPMS module name | akka.stream.alpakka.kafka.testkit |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
|
Home page | https://doc.akka.io/docs/alpakka-kafka/current |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka-kafka |
- Maven
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-kafka-testkit_2.12</artifactId> <version>1.0.5</version> </dependency>
- sbt
- Gradle
Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.
The table below shows Alpakka Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively. We’ve overriden the commons-compress
library to use a version with fewer known security vulnerabilities.
- Direct dependencies
Organization Artifact Version License com.typesafe.akka akka-stream-kafka_2.12 1.0.5 Apache-2.0 com.typesafe.akka akka-stream-testkit_2.12 2.5.23 Apache License, Version 2.0 io.github.embeddedkafka embedded-kafka_2.12 2.1.1 MIT org.apache.commons commons-compress 1.18 Apache License, Version 2.0 org.apache.commons commons-compress 1.8.1 org.apache.kafka kafka_2.12 2.1.1 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 - Dependency tree
Mocking the Consumer or Producer
The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory
and Producer flows in akka.kafka.testkit.ProducerResultFactory
.
To create the materialized value of Consumer sources, akka.kafka.testkit.javadsl.ConsumerControlFactory
offers a wrapped KillSwitch
.
- Scala
- Java
-
import akka.kafka.testkit.ConsumerResultFactory; import akka.kafka.testkit.ProducerResultFactory; import akka.kafka.testkit.javadsl.ConsumerControlFactory; // create elements emitted by the mocked Consumer List<ConsumerMessage.CommittableMessage<String, String>> elements = Arrays.asList( ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset, "metadata")), ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset + 1, "metadata 2"))); // create a source imitating the Consumer.committableSource Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> mockedKafkaConsumerSource = Source.cycle(elements::iterator) .viaMat(ConsumerControlFactory.controlFlow(), Keep.right()); // create a source imitating the Producer.flexiFlow Flow< ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>, ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>, NotUsed> mockedKafkaProducerFlow = Flow .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>> create() .map( msg -> { if (msg instanceof ProducerMessage.Message) { ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset> msg2 = (ProducerMessage.Message< String, String, ConsumerMessage.CommittableOffset>) msg; return ProducerResultFactory.result(msg2); } else throw new RuntimeException("unexpected element: " + msg); }); // run the flow as if it was connected to a Kafka broker Pair<Consumer.Control, CompletionStage<Done>> stream = mockedKafkaConsumerSource .map( msg -> { ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset> message = new ProducerMessage.Message<>( new ProducerRecord<>( targetTopic, msg.record().key(), msg.record().value()), msg.committableOffset()); return message; }) .via(mockedKafkaProducerFlow) .map(ProducerMessage.Results::passThrough) .toMat(Committer.sink(committerSettings), Keep.both()) .run(mat);
Testing with an embedded Kafka server
To test the Alpakka Kafka connector the Embedded Kafka library is an important tool as it helps to easily start and stop Kafka brokers from test cases.
As Kafka uses Scala internally, only the Scala versions supported by Kafka can be used together with Embedded Kafka. To be independent of Kafka’s supported Scala versions, run Kafka in a Docker container.
The helpers for running Embedded Kafka are available for Scala 2.11 and 2.12.
The testkit contains helper classes used by the tests in the Alpakka Kafka connector and may be used for other testing, as well.
Testing with Avro and Schema Registry
If you need to run tests using Confluent’s Schema Registry, you might include embedded-kafka-schema-registry instead.
Testing with Avro and Schema Registry from Java code
Test classes may extend akka.kafka.testkit.javadsl.EmbeddedKafkaTest
(JUnit 5) or akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test
(JUnit 4) to automatically start and stop an embedded Kafka broker.
Furthermore it provides
- preconfigured consumer settings (
ConsumerSettings<String, String> consumerDefaults
), - preconfigured producer settings (
ProducerSettings<String, String> producerDefaults
), - unique topic creation (
createTopic(int number, int partitions, int replication)
), and CompletionStage
value extraction helper (<T> T resultOf(CompletionStage<T> stage, java.time.Duration timeout)
).
The example below shows skeleton test classes for JUnit 4 and JUnit 5.
- Java JUnit 4
-
import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.Test; public class AssignmentTest extends EmbeddedKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("AssignmentTest"); private static final Materializer mat = ActorMaterializer.create(sys); public AssignmentTest() { super(sys, mat, KafkaPorts.AssignmentTest()); } @AfterClass public static void afterClass() { TestKit.shutdownActorSystem(sys); } }
- Java JUnit 5
The JUnit test base classes run the assertAllStagesStopped
check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the stop-timeout
which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf
for tests.
Testing with Avro and Schema Registry from Scala code
The KafkaSpec
class offers access to
- preconfigured consumer settings (
consumerDefaults: ConsumerSettings[String, String]
), - preconfigured producer settings (
producerDefaults: ProducerSettings[String, String]
), - unique topic creation (
createTopic(number: Int = 0, partitions: Int = 1, replication: Int = 1)
), - an implicit
LoggingAdapter
for use with thelog()
operator, and - other goodies.
EmbeddedKafkaLike
extends KafkaSpec
to add automatic starting and stopping of the embedded Kafka broker.
Some Alpakka Kafka tests implemented in Scala use Scalatest with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest 3.0.8.)
- Maven
<dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest</artifactId> <version>3.0.8</version> <scope>test</scope> </dependency>
- sbt
- Gradle
- Scala
-
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.{Matchers, WordSpecLike} abstract class SpecBase(kafkaPort: Int) extends ScalatestKafkaSpec(kafkaPort) with WordSpecLike with Matchers with ScalaFutures with Eventually { protected def this() = this(kafkaPort = -1) }
By mixing in EmbeddedKafkaLike
an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.
- Scala
-
import akka.kafka.testkit.scaladsl.EmbeddedKafkaLike import net.manub.embeddedkafka.EmbeddedKafkaConfig class EmbeddedKafkaSampleSpec extends SpecBase(kafkaPort = 1234) with EmbeddedKafkaLike { // if a specific Kafka broker configuration is desired override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort, Map( "offsets.topic.replication.factor" -> "1", "offsets.retention.minutes" -> "1", "offsets.retention.check.interval.ms" -> "100" )) // ... }
With this EmbeddedKafkaSpecBase
class test classes can extend it to automatically start and stop a Kafka broker to test with. To configure the Kafka broker non-default, override the createKafkaConfig
as shown above.
To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped
. This may interfere with the stop-timeout
which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf
for tests.
Testing with Kafka in Docker
The Testcontainers project contains a nice API to start and stop Apache Kafka in Docker containers. This becomes very relevant when your application code uses a Scala version which Apache Kafka doesn’t support so that EmbeddedKafka can’t be used.
The Testcontainers support is new to Alpakka Kafka since 1.0.2 and may evolve a bit more.
Testing with Kafka in Docker from Java code
The Alpakka Kafka testkit contains helper classes to start Kafka via Testcontainers. Alternatively, you may use just Testcontainers, as it is designed to be used with JUnit and you can follow their documentation to start and stop Kafka. To start a single instance for many tests see Singleton containers.
The Testcontainers dependency must be added to your project explicitly.
- Maven
<dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.11.2</version> <scope>test</scope> </dependency>
- sbt
- Gradle
The example below shows skeleton test classes for JUnit 4 and JUnit 5. The Kafka broker will start before the first test and be stopped after all test classes are finished.
- Java JUnit 4
-
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.Test; public class AssignmentWithTestcontainersTest extends TestcontainersKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("AssignmentTest"); private static final Materializer mat = ActorMaterializer.create(sys); public AssignmentWithTestcontainersTest() { super(sys, mat); } @AfterClass public static void afterClass() { TestKit.shutdownActorSystem(sys); } }
- Java JUnit 5
Testing with Kafka in Docker from Scala code
The Testcontainers dependency must be added to your project explicitly.
- Maven
<dependency> <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>1.11.2</version> <scope>test</scope> </dependency>
- sbt
- Gradle
By mixing in TestcontainersKafkaLike
the Kafka Docker container will be started before the first test and shut down after all tests are finished.
- Scala
-
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.{Matchers, WordSpecLike} abstract class SpecBase(kafkaPort: Int) extends ScalatestKafkaSpec(kafkaPort) with WordSpecLike with Matchers with ScalaFutures with Eventually { protected def this() = this(kafkaPort = -1) } import akka.kafka.testkit.scaladsl.TestcontainersKafkaLike class TestcontainersSampleSpec extends SpecBase with TestcontainersKafkaLike { // ... }
With this TestcontainersSampleSpec
class test classes can extend it to automatically start and stop a Kafka broker to test with.
To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped
. This may interfere with the stop-timeout
which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf
for tests.
Alternative testing libraries
If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.