Writing tests for a Projection

Like other Akka libraries, Projections ships with a TestKit that a user can include to assert the correctness of their Projection handler implementation. Add the Projections TestKit dependency to your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-testkit" % "1.1.0"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-testkit_${scala.binary.version}</artifactId>
    <version>1.1.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-testkit_${versions.ScalaBinary}:1.1.0"
}

Import the ProjectionTestKitProjectionTestKit and other utilities into a new ScalaTest test spec JUnit test.

Scala
import akka.projection.testkit.scaladsl.ProjectionTestKit
import akka.projection.testkit.scaladsl.TestProjection
import akka.projection.testkit.scaladsl.TestSourceProvider
Java
import akka.projection.testkit.javadsl.ProjectionTestKit;
import akka.projection.testkit.javadsl.TestProjection;
import akka.projection.testkit.javadsl.TestSourceProvider;

The TestKit includes several utilities to run the Projection handler in isolation so that a full projection implementation and source provider are not required.

Using these tools we can assert that our Projection handler meets the following requirements of the ItemPopularityProjectionHandler.

  1. Process each shopping cart item event, correctly calculate the item count delta, and update the database.
  2. Log the popularity of every 10th shopping cart item event that is processed.
Scala
package docs.guide

import java.time.Instant

import scala.concurrent.Future

import akka.Done
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.query.Offset
import akka.projection.ProjectionId
import akka.projection.eventsourced.EventEnvelope
import akka.stream.scaladsl.Source
import akka.projection.testkit.scaladsl.ProjectionTestKit
import akka.projection.testkit.scaladsl.TestProjection
import akka.projection.testkit.scaladsl.TestSourceProvider
import org.scalatest.wordspec.AnyWordSpecLike

object ShoppingCartAppSpec {
  // mock out the Cassandra data layer and simulate recording item count updates
  class MockItemPopularityRepository extends ItemPopularityProjectionRepository {
    var counts: Map[String, Long] = Map.empty

    override def update(itemId: String, delta: Int): Future[Done] = Future.successful {
      counts = counts + (itemId -> (counts.getOrElse(itemId, 0L) + delta))
      Done
    }

    override def getItem(itemId: String): Future[Option[Long]] =
      Future.successful(counts.get(itemId))
  }
}

class ShoppingCartAppSpec extends ScalaTestWithActorTestKit() with AnyWordSpecLike {
  import ShoppingCartAppSpec._

  private val projectionTestKit = ProjectionTestKit(system)

  def createEnvelope(event: ShoppingCartEvents.Event, seqNo: Long, timestamp: Long = 0L) =
    EventEnvelope(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp)

  "The ItemPopularityProjectionHandler" should {
    "process item events correctly" in {
      val repo = new MockItemPopularityRepository
      val handler = new ItemPopularityProjectionHandler("tag", system, repo)

      val events = Source(
        List[EventEnvelope[ShoppingCartEvents.Event]](
          createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L),
          createEnvelope(ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1), 1L),
          createEnvelope(ShoppingCartEvents.CheckedOut("a7098", Instant.parse("2020-01-01T12:00:00.00Z")), 2L),
          createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "akka t-shirt", 1), 3L),
          createEnvelope(ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L),
          createEnvelope(ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L),
          createEnvelope(ShoppingCartEvents.CheckedOut("0d12d", Instant.parse("2020-01-01T12:05:00.00Z")), 6L)))

      val projectionId = ProjectionId("name", "key")
      val sourceProvider =
        TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](events, extractOffset = env => env.offset)
      val projection =
        TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler)

      projectionTestKit.run(projection) {
        repo.counts shouldBe Map("bowling shoes" -> 2, "akka t-shirt" -> 1, "skis" -> 0)
      }
    }

    "log item popularity for day every 10 item events" in {
      val repo = new MockItemPopularityRepository
      val handler = new ItemPopularityProjectionHandler("tag", system, repo)

      val events = (0L until 10L).map { i =>
        createEnvelope(ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), i)
      }

      val projectionId = ProjectionId("name", "key")
      val sourceProvider =
        TestSourceProvider[Offset, EventEnvelope[ShoppingCartEvents.Event]](
          Source(events),
          extractOffset = env => env.offset)
      val projection =
        TestProjection[Offset, EventEnvelope[ShoppingCartEvents.Event]](projectionId, sourceProvider, () => handler)

      LoggingTestKit
        .info("ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]")
        .expect {
          projectionTestKit.runWithTestSink(projection) { testSink =>
            testSink.request(events.length)
            testSink.expectNextN(events.length)
          }
        }
    }
  }
}
Java
package jdocs.guide;

import akka.Done;
import akka.NotUsed;
import akka.actor.testkit.typed.javadsl.LoggingTestKit;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.persistence.query.Offset;
import akka.projection.ProjectionId;
import akka.projection.eventsourced.EventEnvelope;
import akka.projection.javadsl.Handler;
import akka.projection.javadsl.SourceProvider;
import akka.projection.testkit.javadsl.ProjectionTestKit;
import akka.projection.testkit.javadsl.TestProjection;
import akka.projection.testkit.javadsl.TestSourceProvider;
import akka.stream.javadsl.Source;
import org.junit.ClassRule;
import org.junit.Test;

import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;

public class ShoppingCartAppTest {
  @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource();
  public static final ProjectionTestKit projectionTestKit =
      ProjectionTestKit.create(testKit.system());

  EventEnvelope<ShoppingCartEvents.Event> createEnvelope(
      ShoppingCartEvents.Event event, Long seqNo, Long timestamp) {
    return EventEnvelope.create(Offset.sequence(seqNo), "persistenceId", seqNo, event, timestamp);
  }

  @Test
  public void projectionHandlerShouldProcessItemEventsCorrectly() {
    MockItemPopularityRepository repo = new MockItemPopularityRepository();
    Handler<EventEnvelope<ShoppingCartEvents.Event>> handler =
        new ItemPopularityProjectionHandler("tag", testKit.system(), repo);

    Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events =
        Source.from(
            Arrays.asList(
                createEnvelope(
                    new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1), 0L, 0L),
                createEnvelope(
                    new ShoppingCartEvents.ItemQuantityAdjusted("a7098", "bowling shoes", 2, 1),
                    1L,
                    0L),
                createEnvelope(
                    new ShoppingCartEvents.CheckedOut(
                        "a7098", Instant.parse("2020-01-01T12:00:00.00Z")),
                    2L,
                    0L),
                createEnvelope(
                    new ShoppingCartEvents.ItemAdded("0d12d", "akka t-shirt", 1), 3L, 0L),
                createEnvelope(new ShoppingCartEvents.ItemAdded("0d12d", "skis", 1), 4L, 0L),
                createEnvelope(new ShoppingCartEvents.ItemRemoved("0d12d", "skis", 1), 5L, 0L),
                createEnvelope(
                    new ShoppingCartEvents.CheckedOut(
                        "0d12d", Instant.parse("2020-01-01T12:05:00.00Z")),
                    6L,
                    0L)));

    ProjectionId projectionId = ProjectionId.of("name", "key");
    SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
        TestSourceProvider.create(events, env -> env.offset());
    TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
        TestProjection.create(projectionId, sourceProvider, () -> handler);

    projectionTestKit.run(
        projection,
        () -> {
          assertEquals(repo.counts.size(), 3);
          assertEquals(repo.counts.get("bowling shoes"), Long.valueOf(2L));
          assertEquals(repo.counts.get("akka t-shirt"), Long.valueOf(1L));
          assertEquals(repo.counts.get("skis"), Long.valueOf(0L));
        });
  }

  @Test
  public void projectionHandlerShouldLogItemPopularityEvery10Events() {
    long eventsNum = 10L;
    MockItemPopularityRepository repo = new MockItemPopularityRepository();
    Handler<EventEnvelope<ShoppingCartEvents.Event>> handler =
        new ItemPopularityProjectionHandler("tag", testKit.system(), repo);

    Source<EventEnvelope<ShoppingCartEvents.Event>, NotUsed> events =
        Source.fromJavaStream(
            () ->
                IntStream.range(0, (int) eventsNum)
                    .boxed()
                    .map(
                        i ->
                            createEnvelope(
                                new ShoppingCartEvents.ItemAdded("a7098", "bowling shoes", 1),
                                Long.valueOf(i),
                                0L)));

    ProjectionId projectionId = ProjectionId.of("name", "key");
    SourceProvider<Offset, EventEnvelope<ShoppingCartEvents.Event>> sourceProvider =
        TestSourceProvider.create(events, env -> env.offset());
    TestProjection<Offset, EventEnvelope<ShoppingCartEvents.Event>> projection =
        TestProjection.create(projectionId, sourceProvider, () -> handler);

    LoggingTestKit.info(
            "ItemPopularityProjectionHandler(tag) item popularity for 'bowling shoes': [10]")
        .expect(
            testKit.system(),
            () -> {
              projectionTestKit.runWithTestSink(
                  projection,
                  testSink -> {
                    testSink.request(eventsNum);
                    testSink.expectNextN(eventsNum);
                  });
              return null; // FIXME: why is a return statement required?
            });
  }

  static class MockItemPopularityRepository implements ItemPopularityProjectionRepository {
    public Map<String, Long> counts = new HashMap<String, Long>();

    @Override
    public CompletionStage<Done> update(String itemId, int delta) {
      counts.put(itemId, counts.getOrDefault(itemId, 0L) + delta);
      return CompletableFuture.completedFuture(Done.getInstance());
    }

    @Override
    public CompletionStage<Optional<Long>> getItem(String itemId) {
      if (counts.containsKey(itemId))
        return CompletableFuture.completedFuture(Optional.of(counts.get(itemId)));

      return CompletableFuture.completedFuture(Optional.empty());
    }
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.