Testing
To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit.
Project Info: Alpakka Kafka testkit | |
---|---|
Artifact | com.typesafe.akka
akka-stream-kafka-testkit
1.0
|
JDK versions | Adopt OpenJDK 8 with Hotspot Adopt OpenJDK 11 with Hotspot |
Scala versions | 2.12.8, 2.11.12 |
JPMS module name | akka.stream.alpakka.kafka.testkit |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
|
Home page | https://doc.akka.io/docs/alpakka-kafka/current/ |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka-kafka |
- Maven
<dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-kafka-testkit_2.12</artifactId> <version>1.0</version> </dependency>
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "1.0"
- Gradle
dependencies { compile group: 'com.typesafe.akka', name: 'akka-stream-kafka-testkit_2.12', version: '1.0' }
Note that Akka testkits do not promise binary compatibility. The API might be changed even between minor versions.
The table below shows Alpakka Kafka testkits’s direct dependencies and the second tab shows all libraries it depends on transitively. We’ve overriden the commons-compress
library to use a version with fewer known security vulnerabilities.
- Direct dependencies
Organization Artifact Version License com.typesafe.akka akka-stream-kafka_2.12 1.0 Apache-2.0 com.typesafe.akka akka-stream-testkit_2.12 2.5.21 Apache License, Version 2.0 net.manub scalatest-embedded-kafka_2.12 2.0.0 MIT org.apache.commons commons-compress 1.18 Apache License, Version 2.0 org.apache.kafka kafka_2.12 2.1.0 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 - Dependency tree
com.typesafe.akka akka-stream-kafka_2.12 1.0 Apache-2.0 com.typesafe.akka akka-stream_2.12 2.5.21 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.21 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 com.typesafe.akka akka-protobuf_2.12 2.5.21 Apache License, Version 2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.apache.kafka kafka-clients 2.1.0 The Apache Software License, Version 2.0 com.github.luben zstd-jni 1.3.5-4 BSD 2-Clause License org.lz4 lz4-java 1.5.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.xerial.snappy snappy-java 1.1.7.2 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 com.typesafe.akka akka-stream-testkit_2.12 2.5.21 Apache License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.21 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.21 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 com.typesafe.akka akka-protobuf_2.12 2.5.21 Apache License, Version 2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.8 Apache-2.0 com.typesafe.akka akka-testkit_2.12 2.5.21 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.21 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 net.manub scalatest-embedded-kafka_2.12 2.0.0 MIT org.apache.avro avro 1.8.2 The Apache Software License, Version 2.0 com.thoughtworks.paranamer paranamer 2.7 BSD org.apache.commons commons-compress 1.18 Apache License, Version 2.0 org.codehaus.jackson jackson-core-asl 1.9.13 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-mapper-asl 1.9.13 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-core-asl 1.9.13 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.tukaani xz 1.5 Public Domain org.xerial.snappy snappy-java 1.1.7.2 The Apache Software License, Version 2.0 org.apache.kafka kafka_2.12 2.1.0 The Apache Software License, Version 2.0 com.101tec zkclient 0.10 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License com.fasterxml.jackson.core jackson-databind 2.9.7 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.9.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.7 The Apache Software License, Version 2.0 com.typesafe.scala-logging scala-logging_2.12 3.9.0 Apache 2.0 License org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-reflect 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.slf4j slf4j-api 1.7.25 MIT License com.yammer.metrics metrics-core 2.2.0 Apache License 2.0 org.slf4j slf4j-api 1.7.25 MIT License net.sf.jopt-simple jopt-simple 5.0.4 The MIT License org.apache.kafka kafka-clients 2.1.0 The Apache Software License, Version 2.0 com.github.luben zstd-jni 1.3.5-4 BSD 2-Clause License org.lz4 lz4-java 1.5.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.xerial.snappy snappy-java 1.1.7.2 The Apache Software License, Version 2.0 org.apache.zookeeper zookeeper 3.4.13 The Apache Software License, Version 2.0 org.apache.yetus audience-annotations 0.5.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-reflect 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.slf4j slf4j-api 1.7.25 MIT License org.scala-lang scala-library 2.12.8 Apache-2.0 org.apache.commons commons-compress 1.18 Apache License, Version 2.0 org.apache.kafka kafka_2.12 2.1.0 The Apache Software License, Version 2.0 com.101tec zkclient 0.10 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License com.fasterxml.jackson.core jackson-databind 2.9.7 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.9.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.7 The Apache Software License, Version 2.0 com.typesafe.scala-logging scala-logging_2.12 3.9.0 Apache 2.0 License org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-reflect 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.slf4j slf4j-api 1.7.25 MIT License com.yammer.metrics metrics-core 2.2.0 Apache License 2.0 org.slf4j slf4j-api 1.7.25 MIT License net.sf.jopt-simple jopt-simple 5.0.4 The MIT License org.apache.kafka kafka-clients 2.1.0 The Apache Software License, Version 2.0 com.github.luben zstd-jni 1.3.5-4 BSD 2-Clause License org.lz4 lz4-java 1.5.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.xerial.snappy snappy-java 1.1.7.2 The Apache Software License, Version 2.0 org.apache.zookeeper zookeeper 3.4.13 The Apache Software License, Version 2.0 org.apache.yetus audience-annotations 0.5.0 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.25 MIT License org.scala-lang scala-library 2.12.8 Apache-2.0 org.scala-lang scala-reflect 2.12.8 Apache-2.0 org.scala-lang scala-library 2.12.8 Apache-2.0 org.slf4j slf4j-api 1.7.25 MIT License org.scala-lang scala-library 2.12.8 Apache-2.0
Mocking the Consumer or Producer
The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory
and Producer flows in akka.kafka.testkit.ProducerResultFactory
.
To create the materialized value of Consumer sources, akka.kafka.testkit.scaladsl.ConsumerControlFactory
akka.kafka.testkit.javadsl.ConsumerControlFactory
offers a wrapped KillSwitch
.
- Scala
-
import akka.kafka.testkit.scaladsl.ConsumerControlFactory import akka.kafka.testkit.{ConsumerResultFactory, ProducerResultFactory} // create elements emitted by the mocked Consumer val elements = immutable.Seq( ConsumerResultFactory.committableMessage( new ConsumerRecord(topic, partition, startOffset, "key", "value 1"), ConsumerResultFactory.committableOffset(groupId, topic, partition, startOffset, "metadata") ), ConsumerResultFactory.committableMessage( new ConsumerRecord(topic, partition, startOffset + 1, "key", "value 2"), ConsumerResultFactory.committableOffset(groupId, topic, partition, startOffset + 1, "metadata 2") ) ) // create a source imitating the Consumer.committableSource val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = Source .cycle(() => elements.iterator) .viaMat(ConsumerControlFactory.controlFlow())(Keep.right) // create a source imitating the Producer.flexiFlow val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset], ProducerMessage.Results[String, String, CommittableOffset], NotUsed] = Flow[ProducerMessage.Envelope[String, String, CommittableOffset]] .map { case msg: ProducerMessage.Message[String, String, CommittableOffset] => ProducerResultFactory.result(msg) case other => throw new Exception(s"excluded: $other") } // run the flow as if it was connected to a Kafka broker val (control, streamCompletion) = mockedKafkaConsumerSource .map( msg => ProducerMessage.Message( new ProducerRecord[String, String](targetTopic, msg.record.value), msg.committableOffset ) ) .via(mockedKafkaProducerFlow) .map(_.passThrough) .toMat(Committer.sink(committerSettings))(Keep.both) .run()
- Java
-
import akka.kafka.testkit.ConsumerResultFactory; import akka.kafka.testkit.ProducerResultFactory; import akka.kafka.testkit.javadsl.ConsumerControlFactory; // create elements emitted by the mocked Consumer List<ConsumerMessage.CommittableMessage<String, String>> elements = Arrays.asList( ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset, "metadata")), ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset + 1, "metadata 2"))); // create a source imitating the Consumer.committableSource Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> mockedKafkaConsumerSource = Source.cycle(elements::iterator) .viaMat(ConsumerControlFactory.controlFlow(), Keep.right()); // create a source imitating the Producer.flexiFlow Flow< ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>, ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>, NotUsed> mockedKafkaProducerFlow = Flow .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>> create() .map( msg -> { if (msg instanceof ProducerMessage.Message) { ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset> msg2 = (ProducerMessage.Message< String, String, ConsumerMessage.CommittableOffset>) msg; return ProducerResultFactory.result(msg2); } else throw new RuntimeException("unexpected element: " + msg); }); // run the flow as if it was connected to a Kafka broker Pair<Consumer.Control, CompletionStage<Done>> stream = mockedKafkaConsumerSource .map( msg -> { ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset> message = new ProducerMessage.Message<>( new ProducerRecord<>( targetTopic, msg.record().key(), msg.record().value()), msg.committableOffset()); return message; }) .via(mockedKafkaProducerFlow) .map(ProducerMessage.Results::passThrough) .toMat(Committer.sink(committerSettings), Keep.both()) .run(mat);
Testing with an embedded Kafka server
To test the Alpakka Kafka connector the Embedded Kafka library is an important tool as it helps to easily start and stop Kafka brokers from test cases.
The testkit contains helper classes used by the tests in the Alpakka Kafka connector and may be used for other testing, as well.
Testing with Avro and Schema Registry
If you need to run tests using Confluent’s Schema Registry, you might include embedded-kafka-schema-registry instead.
Testing from Java code
Test classes may extend akka.kafka.testkit.javadsl.EmbeddedKafkaTest
(JUnit 5) or akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test
(JUnit 4) to automatically start and stop an embedded Kafka broker.
Furthermore it provides
- preconfigured consumer settings (
ConsumerSettings<String, String> consumerDefaults
), - preconfigured producer settings (
ProducerSettings<String, String> producerDefaults
), - unique topic creation (
createTopic(int number, int partitions, int replication)
), and CompletionStage
value extraction helper (<T> T resultOf(CompletionStage<T> stage, java.time.Duration timeout)
).
The example below shows skeleton test classes for JUnit 4 and JUnit 5.
- Java JUnit 4
-
import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.Test; public class AssignmentTest extends EmbeddedKafkaJunit4Test { private static final ActorSystem sys = ActorSystem.create("AssignmentTest"); private static final Materializer mat = ActorMaterializer.create(sys); public AssignmentTest() { super(sys, mat, KafkaPorts.AssignmentTest()); } @AfterClass public static void afterClass() { TestKit.shutdownActorSystem(sys); } }
- Java JUnit 5
-
import akka.kafka.testkit.javadsl.EmbeddedKafkaTest; import akka.testkit.javadsl.TestKit; import org.junit.jupiter.api.*; import org.junit.jupiter.api.TestInstance.Lifecycle; @TestInstance(Lifecycle.PER_CLASS) class ProducerExampleTest extends EmbeddedKafkaTest { private static final ActorSystem system = ActorSystem.create("ProducerExampleTest"); private static final Materializer materializer = ActorMaterializer.create(system); ProducerExampleTest() { super(system, materializer, KafkaPorts.JavaProducerExamples()); } @AfterAll void shutdownActorSystem() { TestKit.shutdownActorSystem(system); } }
Testing from Scala code
The KafkaSpec
class offers access to * preconfigured consumer settings (ConsumerSettings<String, String> consumerDefaults
), * preconfigured producer settings (ProducerSettings<String, String> producerDefaults
), * unique topic creation (createTopic(number: Int = 0, partitions: Int = 1, replication: Int = 1)
), * an implicit LoggingAdapter
for use with the log()
operator, and * other goodies.
EmbeddedKafkaLike
extends KafkaSpec
to add automatic starting and stopping of the embedded Kafka broker.
Most Alpakka Kafka tests implemented in Scala use Scalatest with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest 3.0.5.)
- Scala
-
import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec} import org.scalatest.concurrent.{Eventually, ScalaFutures} import org.scalatest.{Matchers, WordSpecLike} abstract class SpecBase(kafkaPort: Int) extends ScalatestKafkaSpec(kafkaPort) with WordSpecLike with EmbeddedKafkaLike with Matchers with ScalaFutures with Eventually
With this SpecBase
class test classes can extend it to automatically start and stop a Kafka broker to test with.
- Scala
-
import akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped import net.manub.embeddedkafka.EmbeddedKafkaConfig class AssignmentSpec extends SpecBase(kafkaPort = KafkaPorts.AssignmentSpec) { implicit val patience = PatienceConfig(15.seconds, 1.second) def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort, Map( "offsets.topic.replication.factor" -> "1" )) }
Alternative testing libraries
If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.