Testing
To simplify testing of streaming integrations with Alpakka Kafka, it provides the Alpakka Kafka testkit. It provides help for
Embedded Kafka testkit support has been deprecated since 2.0.4 and will be removed in the next minor release.
Use testcontainers (Docker) instead.
Using an embedded Kafka broker- Using Docker to launch a local Kafka cluster with testcontainers
- Mocking the Alpakka Kafka Consumers and Producers
Project Info: Alpakka Kafka testkit | |
---|---|
Artifact | com.typesafe.akka
akka-stream-kafka-testkit
2.0.7
|
JDK versions | Adopt OpenJDK 8 with Hotspot Adopt OpenJDK 11 with Hotspot |
Scala versions | 2.12.10, 2.11.12, 2.13.1 |
JPMS module name | akka.stream.alpakka.kafka.testkit |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
Note: The API of the testkit may change even for minor versions.
|
Home page | https://doc.akka.io/docs/alpakka-kafka/current |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka-kafka |
- Maven
<properties> <akka.version>2.5.31</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-kafka-testkit_${scala.binary.version}</artifactId> <version>2.0.7</version> <scope>test</scope> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream-testkit_${scala.binary.version}</artifactId> <version>${akka.version}</version> <scope>test</scope> </dependency>
- sbt
val AkkaVersion = "2.5.31" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-kafka-testkit" % "2.0.7" % Test, "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test )
- Gradle
versions += [ AkkaVersion: "2.5.31", ScalaBinary: "2.13" ] dependencies { test group: 'com.typesafe.akka', name: "akka-stream-kafka-testkit_${versions.ScalaBinary}", version: '2.0.7', test group: 'com.typesafe.akka', name: "akka-stream-testkit_${versions.ScalaBinary}", version: versions.AkkaVersion }
Note that Akka testkits do not promise binary compatibility. The API might be changed even between patch releases.
The table below shows Alpakka Kafka testkit’s direct dependencies and the second tab shows all libraries it depends on transitively. We’ve overriden the commons-compress
library to use a version with fewer known security vulnerabilities.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream-kafka_2.13 2.0.7 com.typesafe.akka akka-stream-testkit_2.13 2.5.31 org.scala-lang scala-library 2.13.1 - Dependency tree
com.typesafe.akka akka-stream-kafka_2.13 2.0.7 com.typesafe.akka akka-stream_2.13 2.5.31 com.typesafe.akka akka-actor_2.13 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 com.typesafe.akka akka-protobuf_2.13 2.5.31 org.scala-lang scala-library 2.13.1 com.typesafe ssl-config-core_2.13 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.13.1 org.apache.kafka kafka-clients 2.4.1 com.github.luben zstd-jni 1.4.3-1 org.lz4 lz4-java 1.6.0 org.slf4j slf4j-api 1.7.30 org.xerial.snappy snappy-java 1.1.7.3 org.scala-lang.modules scala-collection-compat_2.13 2.1.6 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 com.typesafe.akka akka-stream-testkit_2.13 2.5.31 com.typesafe.akka akka-stream_2.13 2.5.31 com.typesafe.akka akka-actor_2.13 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 com.typesafe.akka akka-protobuf_2.13 2.5.31 org.scala-lang scala-library 2.13.1 com.typesafe ssl-config-core_2.13 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.13.1 com.typesafe.akka akka-testkit_2.13 2.5.31 com.typesafe.akka akka-actor_2.13 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.13 0.9.0 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1 org.scala-lang scala-library 2.13.1
Running Kafka with your tests
Embedded Kafka testkit support has been deprecated since 2.0.4
The Testkit provides a variety of ways to test your application against a real Kafka broker or cluster. There are two main options:
The table below helps guide you to the right Testkit implementation depending on your programming language, testing framework, and use (or not) of Docker containers. You must mix in or implement these types into your test classes to use them. See the documentation for each for more details.
Type | Test Framework | Runtime Mode | Cluster | Schema Registry | Lang | Lifetime |
---|---|---|---|---|---|---|
akka.kafka.testkit.javadsl.EmbeddedKafkaTest |
||||||
akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test |
||||||
akka.kafka.testkit.scaladsl.EmbeddedKafkaLike |
||||||
akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test |
JUnit 5 | Testcontainers | Yes | Yes | Java | All tests, Per class |
akka.kafka.testkit.javadsl.TestcontainersKafkaTest |
JUnit 4 | Testcontainers | Yes | Yes | Java | All tests, Per class |
akka.kafka.testkit.scaladsl.TestcontainersKafkaLike |
ScalaTest | Testcontainers | Yes | Yes | Scala | All tests |
akka.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike |
ScalaTest | Testcontainers | Yes | Yes | Scala | Per class |
Testing with an embedded Kafka server
Embedded Kafka testkit support has been deprecated since 2.0.4
To test the Alpakka Kafka connector the Embedded Kafka library is an important tool as it helps to easily start and stop Kafka brokers from test cases.
Add the Embedded Kafka to your test dependencies:
- Maven
<dependency> <groupId>io.github.embeddedkafka</groupId> <artifactId>embedded-kafka_2.12</artifactId> <version>2.4.1.1</version> <scope>test</scope> </dependency>
- sbt
libraryDependencies += "io.github.embeddedkafka" % "embedded-kafka_2.12" % "2.4.1.1" % Test
- Gradle
dependencies { test group: 'io.github.embeddedkafka', name: 'embedded-kafka_2.12', version: '2.4.1.1' }
As Kafka uses Scala internally, only the Scala versions supported by Kafka can be used together with Embedded Kafka. To be independent of Kafka’s supported Scala versions, run Kafka in a Docker container.
The helpers for running Embedded Kafka are available for Scala 2.11 and 2.12.
The testkit contains helper classes used by the tests in the Alpakka Kafka connector and may be used for other testing, as well.
Testing with Avro and Schema Registry
Embedded Kafka testkit support has been deprecated since 2.0.4
If you need to run tests using Confluent’s Schema Registry, you might include embedded-kafka-schema-registry instead.
Testing with Avro and Schema Registry from Java code
Embedded Kafka testkit support has been deprecated since 2.0.4
Test classes may extend EmbeddedKafkaTest
(JUnit 5) or EmbeddedKafkaJunit4Test
(JUnit 4) to automatically start and stop an embedded Kafka broker.
Furthermore it provides
- preconfigured consumer settings (
ConsumerSettings<String, String> consumerDefaults
), - preconfigured producer settings (
ProducerSettings<String, String> producerDefaults
), - unique topic creation (
createTopic(int number, int partitions, int replication)
), and CompletionStage
value extraction helper (<T> T resultOf(CompletionStage<T> stage, java.time.Duration timeout)
).
The example below shows skeleton test classes for JUnit 4 and JUnit 5.
- Java JUnit 4
-
import akka.kafka.testkit.javadsl.EmbeddedKafkaJunit4Test; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; public class AssignmentTest extends EmbeddedKafkaJunit4Test { @Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4(); private static final ActorSystem sys = ActorSystem.create("AssignmentTest"); private static final Materializer mat = ActorMaterializer.create(sys); public AssignmentTest() { super(sys, mat, KafkaPorts.AssignmentTest()); } @AfterClass public static void afterClass() { TestKit.shutdownActorSystem(sys); } }
- Java JUnit 5
-
import akka.kafka.testkit.javadsl.EmbeddedKafkaTest; import akka.testkit.javadsl.TestKit; import org.junit.jupiter.api.*; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.extension.ExtendWith; @TestInstance(Lifecycle.PER_CLASS) class ProducerExampleTest extends EmbeddedKafkaTest { private static final ActorSystem system = ActorSystem.create("ProducerExampleTest"); private static final Materializer materializer = ActorMaterializer.create(system); ProducerExampleTest() { super(system, materializer, KafkaPorts.ProducerExamplesTest()); } @AfterAll void shutdownActorSystem() { TestKit.shutdownActorSystem(system); executor.shutdown(); } }
The JUnit test base classes run the assertAllStagesStopped
check from Akka Stream testkit to ensure all stages are shut down properly within each test. This may interfere with the stop-timeout
which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf
for tests.
Testing with Avro and Schema Registry from Scala code
Embedded Kafka testkit support has been deprecated since 2.0.4
The KafkaSpec
class offers access to
- preconfigured consumer settings (
consumerDefaults: ConsumerSettings[String, String]
), - preconfigured producer settings (
producerDefaults: ProducerSettings[String, String]
), - unique topic creation (
createTopic(number: Int = 0, partitions: Int = 1, replication: Int = 1)
), - an implicit
LoggingAdapter
for use with thelog()
operator, and - other goodies.
EmbeddedKafkaLike
extends KafkaSpec
to add automatic starting and stopping of the embedded Kafka broker.
Some Alpakka Kafka tests implemented in Scala use Scalatest with the mix-ins shown below. You need to add Scalatest explicitly in your test dependencies (this release of Alpakka Kafka uses Scalatest 3.0.8.)
- Maven
<dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest</artifactId> <version>3.0.8</version> <scope>test</scope> </dependency>
- sbt
libraryDependencies += "org.scalatest" % "scalatest" % "3.0.8" % Test
- Gradle
dependencies { test group: 'org.scalatest', name: 'scalatest', version: '3.0.8' }
- Scala
-
import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures} import org.scalatest.{Matchers, WordSpecLike} abstract class SpecBase(kafkaPort: Int) extends ScalatestKafkaSpec(kafkaPort) with WordSpecLike with Matchers with ScalaFutures with Eventually { protected def this() = this(kafkaPort = -1) }
By mixing in EmbeddedKafkaLike
an embedded Kafka instance will be started before the tests in this test class execute shut down after all tests in this test class are finished.
- Scala
-
import akka.kafka.testkit.scaladsl.EmbeddedKafkaLike import com.github.ghik.silencer.silent import net.manub.embeddedkafka.EmbeddedKafkaConfig @silent class EmbeddedKafkaSampleSpec extends SpecBase(kafkaPort = 1234) with EmbeddedKafkaLike { // if a specific Kafka broker configuration is desired override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort, Map( "offsets.topic.replication.factor" -> "1", "offsets.retention.minutes" -> "1", "offsets.retention.check.interval.ms" -> "100" )) // ... }
With this EmbeddedKafkaSpecBase
class test classes can extend it to automatically start and stop a Kafka broker to test with. To configure the Kafka broker non-default, override the createKafkaConfig
as shown above.
To ensure proper shutdown of all stages in every test, wrap your test code in assertAllStagesStopped
. This may interfere with the stop-timeout
which delays shutdown for Alpakka Kafka consumers. You might need to configure a shorter timeout in your application.conf
for tests.
Alternative testing libraries
If using Maven and Java, an alternative library that provides running Kafka broker instance during testing is kafka-unit by salesforce. It has support for Junit 4 and 5 and supports many different versions of Kafka.
Mocking the Consumer or Producer
The testkit contains factories to create the messages emitted by Consumer sources in akka.kafka.testkit.ConsumerResultFactory
and Producer flows in akka.kafka.testkit.ProducerResultFactory
.
To create the materialized value of Consumer sources, ConsumerControlFactory
ConsumerControlFactory
offers a wrapped KillSwitch
KillSwitch
.
- Scala
-
import akka.kafka.testkit.scaladsl.ConsumerControlFactory import akka.kafka.testkit.{ConsumerResultFactory, ProducerResultFactory} // create elements emitted by the mocked Consumer val elements = (0 to 10).map { i => val nextOffset = startOffset + i ConsumerResultFactory.committableMessage( new ConsumerRecord(topic, partition, nextOffset, "key", s"value $i"), ConsumerResultFactory.committableOffset(groupId, topic, partition, nextOffset, s"metadata $i") ) } // create a source imitating the Consumer.committableSource val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = Source(elements).viaMat(ConsumerControlFactory.controlFlow())(Keep.right) // create a source imitating the Producer.flexiFlow val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset], ProducerMessage.Results[String, String, CommittableOffset], NotUsed] = Flow[ProducerMessage.Envelope[String, String, CommittableOffset]] .map { case msg: ProducerMessage.Message[String, String, CommittableOffset] => ProducerResultFactory.result(msg) case other => throw new Exception(s"excluded: $other") } // run the flow as if it was connected to a Kafka broker val (control, streamCompletion) = mockedKafkaConsumerSource .map( msg => ProducerMessage.Message( new ProducerRecord[String, String](targetTopic, msg.record.value), msg.committableOffset ) ) .via(mockedKafkaProducerFlow) .map(_.passThrough) .toMat(Committer.sink(committerSettings))(Keep.both) .run()
- Java
-
import akka.kafka.testkit.ConsumerResultFactory; import akka.kafka.testkit.ProducerResultFactory; import akka.kafka.testkit.javadsl.ConsumerControlFactory; // create elements emitted by the mocked Consumer List<ConsumerMessage.CommittableMessage<String, String>> elements = Arrays.asList( ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset, "metadata")), ConsumerResultFactory.committableMessage( new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"), ConsumerResultFactory.committableOffset( groupId, topic, partition, startOffset + 1, "metadata 2"))); // create a source imitating the Consumer.committableSource Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control> mockedKafkaConsumerSource = Source.cycle(elements::iterator) .viaMat(ConsumerControlFactory.controlFlow(), Keep.right()); // create a source imitating the Producer.flexiFlow Flow< ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>, ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>, NotUsed> mockedKafkaProducerFlow = Flow .<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>> create() .map( msg -> { if (msg instanceof ProducerMessage.Message) { ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset> msg2 = (ProducerMessage.Message< String, String, ConsumerMessage.CommittableOffset>) msg; return ProducerResultFactory.result(msg2); } else throw new RuntimeException("unexpected element: " + msg); }); // run the flow as if it was connected to a Kafka broker Pair<Consumer.Control, CompletionStage<Done>> stream = mockedKafkaConsumerSource .map( msg -> { ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset> message = new ProducerMessage.Message<>( new ProducerRecord<>( targetTopic, msg.record().key(), msg.record().value()), msg.committableOffset()); return message; }) .via(mockedKafkaProducerFlow) .map(ProducerMessage.Results::passThrough) .toMat(Committer.sink(committerSettings), Keep.both()) .run(mat);