Testing

Akka Projections provides a TestKit to ease testing. There are two supported styles of test: running with an assert function and driving it with an Akka Streams TestKit TestSubscriber.Probe.

Dependencies

To use the Akka Projections TestKit add the following dependency in your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-testkit" % "1.2.5" % Test
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-testkit_${scala.binary.version}</artifactId>
    <version>1.2.5</version>
    <scope>test</scope>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  testImplementation "com.lightbend.akka:akka-projection-testkit_${versions.ScalaBinary}:1.2.5"
}

Akka Projections require Akka 2.6.18 or later, see Akka version.

Project Info: Akka Projections TestKit
Artifact
com.lightbend.akka
akka-projection-testkit
1.2.5
JDK versions
AdoptOpenJDK 8
AdoptOpenJDK 11
Scala versions2.13.3, 2.12.16
JPMS module nameakka.projection.testkit
License
Readiness level
Since 1.0.0, 2020-09-10
Home pagehttps://akka.io
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/akka/akka-projection

Transitive dependencies

The table below shows akka-projection-testkit’s direct dependencies and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.lightbend.akkaakka-projection-core_2.131.2.5
com.typesafe.akkaakka-actor-testkit-typed_2.132.6.18
com.typesafe.akkaakka-stream-testkit_2.132.6.18
org.scala-lang.modulesscala-collection-compat_2.132.5.0
org.scala-langscala-library2.13.3
Dependency tree
com.lightbend.akka    akka-projection-core_2.13    1.2.5
    com.typesafe.akka    akka-actor-typed_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.slf4j    slf4j-api    1.7.32
        org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.slf4j    slf4j-api    1.7.32
    com.typesafe.akka    akka-persistence-query_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-persistence_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
                com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
                com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                    com.typesafe    config    1.4.0    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.3    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.3    CC0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
            com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.3    CC0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
com.typesafe.akka    akka-actor-testkit-typed_2.13    2.6.18    Apache-2.0
    com.typesafe.akka    akka-actor-typed_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.6.18    Apache-2.0
            com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
                com.typesafe    config    1.4.0    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.3    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.slf4j    slf4j-api    1.7.32
        org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.slf4j    slf4j-api    1.7.32
    com.typesafe.akka    akka-slf4j_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.slf4j    slf4j-api    1.7.32
    com.typesafe.akka    akka-testkit_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
com.typesafe.akka    akka-stream-testkit_2.13    2.6.18    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.6.18    Apache-2.0
        com.typesafe    ssl-config-core_2.13    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    com.typesafe.akka    akka-testkit_2.13    2.6.18    Apache-2.0
        com.typesafe.akka    akka-actor_2.13    2.6.18    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.3    Apache-2.0
            org.scala-lang    scala-library    2.13.3    Apache-2.0
        org.scala-lang    scala-library    2.13.3    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
org.scala-lang.modules    scala-collection-compat_2.13    2.5.0    Apache-2.0
    org.scala-lang    scala-library    2.13.3    Apache-2.0
org.scala-lang    scala-library    2.13.3    Apache-2.0

Initializing the Projection TestKit

The Projection TestKit requires an instance of ActorTestKit. We recommend using Akka’s ScalaTestWithActorTestKitTestKitJunitResource

Scala
sourceimport akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.projection.testkit.scaladsl.ProjectionTestKit

class TestKitDocExample extends ScalaTestWithActorTestKit {
  private val projectionTestKit = ProjectionTestKit(system)

}
Java
sourceimport akka.projection.testkit.javadsl.TestSourceProvider;
import org.junit.ClassRule;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.projection.testkit.javadsl.ProjectionTestKit;

@ClassRule static final TestKitJunitResource testKit = new TestKitJunitResource();
ProjectionTestKit projectionTestKit = ProjectionTestKit.create(testKit.system());

Testing with an assert function

When testing with an assert function the Projection is started and stopped by the TestKit. While the projection is running, the assert function will be called until it completes without errors (no exceptions or assertion errors are thrown).

In the example below the Projection will update a CartView. The test will run until it observes that the CartView for id abc-def is available in the repository.

Scala
sourceimport akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.projection.testkit.scaladsl.ProjectionTestKit

projectionTestKit.run(projection) {
  // confirm that cart checkout was inserted in db
  cartViewRepository.findById("abc-def").futureValue
}
Java
sourceprojectionTestKit.run(
    projection,
    () ->
        cartCheckoutRepository
            .findById("abc-def")
            .toCompletableFuture()
            .get(1, TimeUnit.SECONDS));

By default, the test will run for 3 seconds. The assert function will be called every 100 milliseconds. Those values can be modified programatically.

Note: when testing a Projection with this method, the Restart Backoff is disabled. Any backoff configuration settings from .conf file or programmatically added will be overwritten.

Scala
sourceimport scala.concurrent.duration._

projectionTestKit.run(projection, max = 5.seconds, interval = 300.millis) {
  // confirm that cart checkout was inserted in db
  cartViewRepository.findById("abc-def").futureValue
}
Java
sourceimport java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

projectionTestKit.run(
    projection,
    Duration.ofSeconds(5),
    Duration.ofMillis(300),
    () ->
        cartCheckoutRepository
            .findById("abc-def")
            .toCompletableFuture()
            .get(1, TimeUnit.SECONDS));

Testing with a TestSubscriber.Probe

The Akka Stream TestKit can be used to drive the pace of envelopes flowing through the Projection.

The Projection starts as soon as the first element is requested by the TestSubscriber.Probe, new elements will be emitted as requested. The Projection is stopped once the assert function completes.

Scala
sourceprojectionTestKit.runWithTestSink(projection) { sinkProbe =>
  sinkProbe.request(1)
  sinkProbe.expectNext(Done)
}

// confirm that cart checkout was inserted in db
cartViewRepository.findById("abc-def").futureValue
Java
sourceimport static org.junit.Assert.assertEquals;

projectionTestKit.runWithTestSink(
    projection,
    sinkProbe -> {
      sinkProbe.request(1);
      sinkProbe.expectNext(Done.getInstance());
      cartCheckoutRepository.findById("abc-def").toCompletableFuture().get(1, TimeUnit.SECONDS);
    });

Testing with mocked Projection and SourceProvider

To test a handler in isolation you may want to mock out the implementation of a Projection or SourceProvider so that you don’t have to setup and teardown the associated technology as part of your integration test. For example, you may want to project against a Cassandra database, or read envelopes from an Akka Persistence journal source, but you don’t want to have to run Docker containers or embedded/in-memory services just to run your tests. The TestProjectionTestProjection allows you to isolate the runtime of your handler so that you don’t need to run these services. Using a TestProjection has the added benefit of being fast, since you can run everything within the JVM that runs your tests.

Alongside the TestProjection is the TestSourceProviderTestSourceProvider which can be used to provide test data to the TestProjection running the handler. Test data can be represented in an akka streams SourceSource that is passed to the TestSourceProvider constructor.

Scala
sourceimport akka.stream.scaladsl.Source

val testData = Source((0, "abc") :: (1, "def") :: Nil)

val extractOffset = (envelope: (Int, String)) => envelope._1

val sourceProvider = TestSourceProvider(testData, extractOffset)

val projection = TestProjection(ProjectionId("test", "00"), sourceProvider, () => handler)

projectionTestKit.run(projection) {
  // assert logic ..
}
Java
source
import akka.japi.Pair; import akka.stream.javadsl.Source; import akka.projection.testkit.javadsl.TestProjection; List<Pair<Integer, String>> testData = Stream.of(Pair.create(0, "abc"), Pair.create(1, "def")).collect(Collectors.toList()); Source<Pair<Integer, String>, NotUsed> source = Source.from(testData); Function<Pair<Integer, String>, Integer> extractOffsetFn = (Pair<Integer, String> env) -> env.first(); TestSourceProvider<Integer, Pair<Integer, String>> sourceProvider = TestSourceProvider.create(source, extractOffsetFn); Projection<Pair<Integer, String>> projection = TestProjection.create(ProjectionId.of("test", "00"), sourceProvider, () -> handler); projectionTestKit.run( projection, () -> { // assert logic ... });
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.