Testing

Dependency

To use Akka Persistence and Actor TestKit, add the module to your project:

sbt
libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-persistence-typed" % "2.6.4+44-0933d745",
  "com.typesafe.akka" %% "akka-actor-testkit-typed" % "2.6.4+44-0933d745" % Test
)
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-persistence-typed_2.12</artifactId>
  <version>2.6.4+44-0933d745</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor-testkit-typed_2.12</artifactId>
  <version>2.6.4+44-0933d745</version>
  <scope>test</scope>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-persistence-typed_2.12', version: '2.6.4+44-0933d745',
  test group: 'com.typesafe.akka', name: 'akka-actor-testkit-typed_2.12', version: '2.6.4+44-0933d745'
}

Unit testing

Unit testing of EventSourcedBehavior can be done with the ActorTestKit in the same way as other behaviors.

Synchronous behavior testing for EventSourcedBehavior is not supported yet, but tracked in issue #23712.

You need to configure a journal, and the in-memory journal is sufficient for unit testing. To enable the in-memory journal you need to pass the following configuration to the ScalaTestWithActorTestKitTestKitJunitResource.

Scala
""" 
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.persistence.journal.inmem.test-serialization = on
"""
Java
private static final String inmemConfig =
    "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
        + "akka.persistence.journal.inmem.test-serialization = on \n";

The test-serialization = on configuration of the InmemJournal will verify that persisted events can be serialized and deserialized.

Optionally you can also configure a snapshot store. To enable the file based snapshot store you need to pass the following configuration to the ScalaTestWithActorTestKitTestKitJunitResource.

Scala
s""" 
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
"""
Java
private static final String snapshotConfig =
    "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n"
        + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-"
        + UUID.randomUUID().toString()
        + "\" \n";

Then you can spawn the EventSourcedBehavior and verify the outcome of sending commands to the actor using the facilities of the ActorTestKit.

A full test for the AccountEntity, which is shown in the Persistence Style Guide, may look like this:

Scala
import java.util.UUID

import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike

class AccountExampleDocSpec extends ScalaTestWithActorTestKit(s"""
      akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
      akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
      akka.persistence.snapshot-store.local.dir = "target/snapshot-${UUID.randomUUID().toString}"
    """) with AnyWordSpecLike with LogCapturing {

  "Account" must {

    "handle Withdraw" in {
      val probe = createTestProbe[AccountEntity.OperationResult]()
      val ref = spawn(AccountEntity("1", PersistenceId("Account", "1")))
      ref ! AccountEntity.CreateAccount(probe.ref)
      probe.expectMessage(AccountEntity.Confirmed)
      ref ! AccountEntity.Deposit(100, probe.ref)
      probe.expectMessage(AccountEntity.Confirmed)
      ref ! AccountEntity.Withdraw(10, probe.ref)
      probe.expectMessage(AccountEntity.Confirmed)
    }

    "reject Withdraw overdraft" in {
      val probe = createTestProbe[AccountEntity.OperationResult]()
      val ref = spawn(AccountEntity("2", PersistenceId("Account", "2")))
      ref ! AccountEntity.CreateAccount(probe.ref)
      probe.expectMessage(AccountEntity.Confirmed)
      ref ! AccountEntity.Deposit(100, probe.ref)
      probe.expectMessage(AccountEntity.Confirmed)
      ref ! AccountEntity.Withdraw(110, probe.ref)
      probe.expectMessageType[AccountEntity.Rejected]
    }

    "handle GetBalance" in {
      val opProbe = createTestProbe[AccountEntity.OperationResult]()
      val ref = spawn(AccountEntity("3", PersistenceId("Account", "3")))
      ref ! AccountEntity.CreateAccount(opProbe.ref)
      opProbe.expectMessage(AccountEntity.Confirmed)
      ref ! AccountEntity.Deposit(100, opProbe.ref)
      opProbe.expectMessage(AccountEntity.Confirmed)

      val getProbe = createTestProbe[AccountEntity.CurrentBalance]()
      ref ! AccountEntity.GetBalance(getProbe.ref)
      getProbe.expectMessage(AccountEntity.CurrentBalance(100))
    }

  }
}
Java
import java.math.BigDecimal;
import java.util.UUID;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;

import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.PersistenceId;

public class AccountExampleDocTest
{

  private static final String inmemConfig =
      "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"
          + "akka.persistence.journal.inmem.test-serialization = on \n";


  private static final String snapshotConfig =
      "akka.persistence.snapshot-store.plugin = \"akka.persistence.snapshot-store.local\" \n"
          + "akka.persistence.snapshot-store.local.dir = \"target/snapshot-"
          + UUID.randomUUID().toString()
          + "\" \n";

  private static final String config = inmemConfig + snapshotConfig;

  @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);

  @Rule public final LogCapturing logCapturing = new LogCapturing();

  @Test
  public void handleWithdraw() {
    ActorRef<AccountEntity.Command> ref =
        testKit.spawn(AccountEntity.create("1", PersistenceId.of("Account", "1")));
    TestProbe<AccountEntity.OperationResult> probe =
        testKit.createTestProbe(AccountEntity.OperationResult.class);
    ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
    probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
    ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
    probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
    ref.tell(new AccountEntity.Withdraw(BigDecimal.valueOf(10), probe.getRef()));
    probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
  }

  @Test
  public void rejectWithdrawOverdraft() {
    ActorRef<AccountEntity.Command> ref =
        testKit.spawn(AccountEntity.create("2", PersistenceId.of("Account", "2")));
    TestProbe<AccountEntity.OperationResult> probe =
        testKit.createTestProbe(AccountEntity.OperationResult.class);
    ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
    probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
    ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
    probe.expectMessage(AccountEntity.Confirmed.INSTANCE);
    ref.tell(new AccountEntity.Withdraw(BigDecimal.valueOf(110), probe.getRef()));
    probe.expectMessageClass(AccountEntity.Rejected.class);
  }

  @Test
  public void handleGetBalance() {
    ActorRef<AccountEntity.Command> ref =
        testKit.spawn(AccountEntity.create("3", PersistenceId.of("Account", "3")));
    TestProbe<AccountEntity.OperationResult> opProbe =
        testKit.createTestProbe(AccountEntity.OperationResult.class);
    ref.tell(new AccountEntity.CreateAccount(opProbe.getRef()));
    opProbe.expectMessage(AccountEntity.Confirmed.INSTANCE);
    ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), opProbe.getRef()));
    opProbe.expectMessage(AccountEntity.Confirmed.INSTANCE);

    TestProbe<AccountEntity.CurrentBalance> getProbe =
        testKit.createTestProbe(AccountEntity.CurrentBalance.class);
    ref.tell(new AccountEntity.GetBalance(getProbe.getRef()));
    assertEquals(
        BigDecimal.valueOf(100),
        getProbe.expectMessageClass(AccountEntity.CurrentBalance.class).balance);
  }

}

Note that each test case is using a different PersistenceId to not interfere with each other.

The InmemJournalInmemJournal publishes Write and Delete operations to the eventStream, which makes it possible to verify that the expected events have been emitted and stored by the EventSourcedBehavior. You can subscribe to to the eventStream with a TestProbe like this:

Scala
import akka.persistence.journal.inmem.InmemJournal
import akka.actor.typed.eventstream.EventStream

"store events" in {
  val eventProbe = createTestProbe[InmemJournal.Operation]()
  system.eventStream ! EventStream.Subscribe(eventProbe.ref)

  val probe = createTestProbe[AccountEntity.OperationResult]()
  val ref = spawn(AccountEntity("4", PersistenceId("Account", "4")))
  ref ! AccountEntity.CreateAccount(probe.ref)
  eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.AccountCreated)

  ref ! AccountEntity.Deposit(100, probe.ref)
  probe.expectMessage(AccountEntity.Confirmed)
  eventProbe.expectMessageType[InmemJournal.Write].event should ===(AccountEntity.Deposited(100))
}
Java
import akka.actor.typed.eventstream.EventStream;
import akka.persistence.journal.inmem.InmemJournal;

@Test
public void storeEvents() {
  TestProbe<InmemJournal.Operation> eventProbe = testKit.createTestProbe();
  testKit
      .system()
      .eventStream()
      .tell(new EventStream.Subscribe<>(InmemJournal.Operation.class, eventProbe.getRef()));

  ActorRef<AccountEntity.Command> ref =
      testKit.spawn(AccountEntity.create("4", PersistenceId.of("Account", "4")));
  TestProbe<AccountEntity.OperationResult> probe =
      testKit.createTestProbe(AccountEntity.OperationResult.class);
  ref.tell(new AccountEntity.CreateAccount(probe.getRef()));
  assertEquals(
      AccountEntity.AccountCreated.INSTANCE,
      eventProbe.expectMessageClass(InmemJournal.Write.class).event());

  ref.tell(new AccountEntity.Deposit(BigDecimal.valueOf(100), probe.getRef()));
  assertEquals(
      BigDecimal.valueOf(100),
      ((AccountEntity.Deposited) eventProbe.expectMessageClass(InmemJournal.Write.class).event())
          .amount);
}

Persistence TestKit

Note! The testkit is a new feature, api may have changes breaking source compatibility in future versions.

Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-persistence-testkit" % "2.6.4+44-0933d745"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-persistence-testkit_2.12</artifactId>
  <version>2.6.4+44-0933d745</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-persistence-testkit_2.12', version: '2.6.4+44-0933d745'
}

There are two testkit classes which have similar api:

The testkit classes have two corresponding plugins which emulate the behavior of the storages:

Note! The corresponding plugins must be configured in the actor system which is used to initialize the particular testkit class:

Scala

val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem(??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration)) val testKit = PersistenceTestKit(system)
Java
public class PersistenceTestKitConfig {

  Config conf =
      PersistenceTestKitPlugin.getInstance()
          .config()
          .withFallback(ConfigFactory.defaultApplication());

  ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf);

  PersistenceTestKit testKit = PersistenceTestKit.create(system);
}

and

Scala

val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration)) val testKit = SnapshotTestKit(system)
Java
public class SnapshotTestKitConfig {

  Config conf =
      PersistenceTestKitSnapshotPlugin.getInstance()
          .config()
          .withFallback(ConfigFactory.defaultApplication());

  ActorSystem<Command> system = ActorSystem.create(new SomeBehavior(), "example", conf);

  SnapshotTestKit testKit = SnapshotTestKit.create(system);
}

A typical scenario is to create a persistent actor, send commands to it and check that it persists events as it is expected:

Scala
class TypedSampleSpec extends AnyWordSpecLike with BeforeAndAfterAll {

  val system: ActorSystem[Cmd] = ActorSystem(
    EventSourcedBehavior[Cmd, Evt, State](
      persistenceId = ???,
      eventHandler = ???,
      commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)),
      emptyState = ???),
    "name",
    PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication()))
  val persistenceTestKit = PersistenceTestKit(system)

  override def beforeAll(): Unit =
    persistenceTestKit.clearAll()

  "Persistent actor" should {

    "persist all events" in {

      val persistentActor = system
      val cmd = Cmd("data")

      persistentActor ! cmd

      val expectedPersistedEvent = Evt(cmd.data)
      persistenceTestKit.expectNextPersisted("your-persistence-id", expectedPersistedEvent)
    }

  }
}
Java
class SampleTest {

  @ClassRule
  public static final TestKitJunitResource testKit =
      new TestKitJunitResource(
          PersistenceTestKitPlugin.getInstance()
              .config()
              .withFallback(ConfigFactory.defaultApplication()));

  PersistenceTestKit persistenceTestKit = PersistenceTestKit.create(testKit.system());

  @Before
  void beforeAll() {
    persistenceTestKit.clearAll();
  }

  @Test
  void test() {
    ActorRef<Cmd> ref =
        testKit.spawn(new YourPersistentBehavior(PersistenceId.ofUniqueId("some-id")));

    Cmd cmd = new Cmd("data");
    ref.tell(cmd);
    Evt expectedEventPersisted = new Evt(cmd.data);

    persistenceTestKit.expectNextPersisted("your-persistence-id", expectedEventPersisted);
  }
}

final class Cmd {

  public final String data;

  public Cmd(String data) {
    this.data = data;
  }
}

final class Evt {

  public final String data;

  public Evt(String data) {
    this.data = data;
  }
}

final class State {}

class YourPersistentBehavior extends EventSourcedBehavior<Cmd, Evt, State> {

  public YourPersistentBehavior(PersistenceId persistenceId) {
    super(persistenceId);
  }

  @Override
  public State emptyState() {
    // some state
    return new State();
  }

  @Override
  public CommandHandler<Cmd, Evt, State> commandHandler() {
    return newCommandHandlerBuilder()
        .forAnyState()
        .onCommand(Cmd.class, command -> Effect().persist(new Evt(command.data)))
        .build();
  }

  @Override
  public EventHandler<State, Evt> eventHandler() {
    // TODO handle events
    return newEventHandlerBuilder().build();
  }
}

You can safely use persistence testkit in combination with main akka testkit.

The main methods of the api allow to (see PersistenceTestKitPersistenceTestKit and SnapshotTestKitSnapshotTestKit for more details):

  • check if the given event/snapshot object is the next persisted in the storage.
  • read a sequence of persisted events/snapshots.
  • check that no events/snapshots have been persisted in the storage.
  • throw the default exception from the storage on attempt to persist, read or delete the following event/snapshot.
  • clear the events/snapshots persisted in the storage.
  • reject the events, but not snapshots (rejections are not supported for snapshots in the original api).
  • set your own policy which emulates the work of the storage. Policy determines what to do when persistence needs to execute some operation on the storage (i.e. read, delete, etc.).
  • get all the events/snapshots persisted in the storage
  • put the events/snapshots in the storage to test recovery

Setting your own policy for the storage

You can implement and set your own policy for the storage to control its actions on particular operations, for example you can fail or reject events on your own conditions. Implement the ProcessingPolicy[EventStorage.JournalOperation]ProcessingPolicy<EventStorage.JournalOperation> traitinterface for event storage or ProcessingPolicy[SnapshotStorage.SnapshotOperation]ProcessingPolicy<SnapshotStorage.SnapshotOperation> traitinterface for snapshot storage, and set it with withPolicy() method.

tryProcess() method of the ProcessingPolicyProcessingPolicy has two arguments: persistence id and the storage operation.

Event storage has the following operations:

Snapshot storage has the following operations:

The tryProcess() method must return one of the processing results:

Note that snapshot storage does not have rejections. If you return Reject in the tryProcess() of the snapshot storage policy, it will have the same effect as the StorageFailure.

Here is an example of the policy for an event storage:

Scala
class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType {

  //you can use internal state, it does not need to be thread safe
  var count = 1

  override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult =
    if (count < 10) {
      count += 1
      //check the type of operation and react with success or with reject or with failure.
      //if you return ProcessingSuccess the operation will be performed, otherwise not.
      processingUnit match {
        case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess
        case WriteEvents(batch) if batch.size > 1 =>
          ProcessingSuccess
        case ReadSeqNum      => StorageFailure()
        case DeleteEvents(_) => Reject()
        case _               => StorageFailure()
      }
    } else {
      ProcessingSuccess
    }

}
Java
class SampleEventStoragePolicy implements ProcessingPolicy<JournalOperation> {

  // you can use internal state, it does not need to be thread safe
  int count = 1;

  @Override
  public ProcessingResult tryProcess(String persistenceId, JournalOperation processingUnit) {
    // check the type of operation and react with success or with reject or with failure.
    // if you return ProcessingSuccess the operation will be performed, otherwise not.
    if (count < 10) {
      count += 1;
      if (processingUnit instanceof ReadEvents) {
        ReadEvents read = (ReadEvents) processingUnit;
        if (read.batch().nonEmpty()) {
          ProcessingSuccess.getInstance();
        } else {
          return StorageFailure.create();
        }
      } else if (processingUnit instanceof WriteEvents) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit instanceof DeleteEvents) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit.equals(ReadSeqNum.getInstance())) {
        return Reject.create();
      }
      // you can set your own exception
      return StorageFailure.create(new RuntimeException("your exception"));
    } else {
      return ProcessingSuccess.getInstance();
    }
  }
}

Here is an example of the policy for a snapshot storage:

Scala
class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {

  //you can use internal state, it does not need to be thread safe
  var count = 1

  override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult =
    if (count < 10) {
      count += 1
      //check the type of operation and react with success or with reject or with failure.
      //if you return ProcessingSuccess the operation will be performed, otherwise not.
      processingUnit match {
        case ReadSnapshot(_, payload) if payload.nonEmpty =>
          ProcessingSuccess
        case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 =>
          ProcessingSuccess
        case DeleteSnapshotsByCriteria(_) => StorageFailure()
        case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 =>
          ProcessingSuccess
        case _ => StorageFailure()
      }
    } else {
      ProcessingSuccess
    }
}
Java
class SnapshotStoragePolicy implements ProcessingPolicy<SnapshotOperation> {

  // you can use internal state, it doesn't need to be thread safe
  int count = 1;

  @Override
  public ProcessingResult tryProcess(String persistenceId, SnapshotOperation processingUnit) {
    // check the type of operation and react with success or with failure.
    // if you return ProcessingSuccess the operation will be performed, otherwise not.
    if (count < 10) {
      count += 1;
      if (processingUnit instanceof ReadSnapshot) {
        ReadSnapshot read = (ReadSnapshot) processingUnit;
        if (read.getSnapshot().isPresent()) {
          ProcessingSuccess.getInstance();
        } else {
          return StorageFailure.create();
        }
      } else if (processingUnit instanceof WriteSnapshot) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit instanceof DeleteSnapshotsByCriteria) {
        return ProcessingSuccess.getInstance();
      } else if (processingUnit instanceof DeleteSnapshotByMeta) {
        return ProcessingSuccess.getInstance();
      }
      // you can set your own exception
      return StorageFailure.create(new RuntimeException("your exception"));
    } else {
      return ProcessingSuccess.getInstance();
    }
  }
}

Configuration of Persistence TestKit

There are several configuration properties for persistence testkit, please refer to the reference configuration

Integration testing

The in-memory journal and file based snapshot store can be used also for integration style testing of a single ActorSystem, for example when using Cluster Sharding with a single Cluster node.

For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it’s possible to use the Persistence Plugin Proxy it’s often better and more realistic to use a real database.

See akka-samples issue #128.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.