Testing
Module info
To use Akka Persistence TestKit, add the module to your project:
- sbt
val AkkaVersion = "2.6.21" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test )
- Maven
- Gradle
Project Info: Akka Persistence Testkit | |
---|---|
Artifact | com.typesafe.akka
akka-persistence-testkit
2.6.21
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.13.8, 2.12.16, 3.1.2 |
JPMS module name | akka.persistence.testkit |
License | |
Readiness level |
Since 2.6.5, 2020-04-30
|
Home page | https://akka.io/ |
API documentation | |
Forums | |
Release notes | akka.io blog |
Issues | Github issues |
Sources | https://github.com/akka/akka |
Unit testing
Note! The EventSourcedBehaviorTestKit
is a new feature, api may have changes breaking source compatibility in future versions.
Unit testing of EventSourcedBehavior
can be done with the EventSourcedBehaviorTestKit
. It supports running one command at a time and you can assert that the synchronously returned result is as expected. The result contains the events emitted by the command and the new state after applying the events. It also has support for verifying the reply to a command.
You need to configure the ActorSystem
with the EventSourcedBehaviorTestKit.config
. The configuration enables the in-memory journal and snapshot storage.
- Scala
-
source
class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config)
- Java
A full test for the AccountEntity
, which is shown in the Persistence Style Guide, may look like this:
- Scala
-
source
import akka.Done import akka.persistence.testkit.scaladsl.EventSourcedBehaviorTestKit import akka.persistence.typed.PersistenceId import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.pattern.StatusReply import org.scalatest.BeforeAndAfterEach import org.scalatest.wordspec.AnyWordSpecLike class AccountExampleDocSpec extends ScalaTestWithActorTestKit(EventSourcedBehaviorTestKit.config) with AnyWordSpecLike with BeforeAndAfterEach with LogCapturing { private val eventSourcedTestKit = EventSourcedBehaviorTestKit[AccountEntity.Command, AccountEntity.Event, AccountEntity.Account]( system, AccountEntity("1", PersistenceId("Account", "1"))) override protected def beforeEach(): Unit = { super.beforeEach() eventSourcedTestKit.clear() } "Account" must { "be created with zero balance" in { val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) result.reply shouldBe StatusReply.Ack result.event shouldBe AccountEntity.AccountCreated result.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 0 } "handle Withdraw" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) val result1 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) result1.reply shouldBe StatusReply.Ack result1.event shouldBe AccountEntity.Deposited(100) result1.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 100 val result2 = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(10, _)) result2.reply shouldBe StatusReply.Ack result2.event shouldBe AccountEntity.Withdrawn(10) result2.stateOfType[AccountEntity.OpenedAccount].balance shouldBe 90 } "reject Withdraw overdraft" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Withdraw(110, _)) result.reply.isError shouldBe true result.hasNoEvents shouldBe true } "handle GetBalance" in { eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.CreateAccount(_)) eventSourcedTestKit.runCommand[StatusReply[Done]](AccountEntity.Deposit(100, _)) val result = eventSourcedTestKit.runCommand[AccountEntity.CurrentBalance](AccountEntity.GetBalance(_)) result.reply.balance shouldBe 100 result.hasNoEvents shouldBe true } } }
- Java
Serialization of commands, events and state are verified automatically. The serialization checks can be customized with the SerializationSettings
when creating the EventSourcedBehaviorTestKit
. By default, the serialization roundtrip is checked but the equality of the result of the serialization is not checked. equals
must be implemented (or using case class
) in the commands, events and state if verifyEquality
is enabled.
To test recovery the restart
method of the EventSourcedBehaviorTestKit
can be used. It will restart the behavior, which will then recover from stored snapshot and events from previous commands. It’s also possible to populate the storage with events or simulate failures by using the underlying PersistenceTestKit
.
Persistence TestKit
Note! The PersistenceTestKit
is a new feature, api may have changes breaking source compatibility in future versions.
Persistence testkit allows to check events saved in a storage, emulate storage operations and exceptions. To use the testkit you need to add the following dependency in your project:
- sbt
val AkkaVersion = "2.6.21" libraryDependencies += "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
- Maven
- Gradle
There are two testkit classes which have similar api:
PersistenceTestKit
class is for eventsSnapshotTestKit
class is for snapshots
The testkit classes have two corresponding plugins which emulate the behavior of the storages:
PersistenceTestKitPlugin
class emulates a events storagePersistenceTestKitSnapshotPlugin
class emulates a snapshots storage
Note! The corresponding plugins must be configured in the actor system which is used to initialize the particular testkit class:
- Scala
-
source
val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem(??? /*some behavior*/, "test-system", PersistenceTestKitPlugin.config.withFallback(yourConfiguration)) val testKit = PersistenceTestKit(system) - Java
and
- Scala
-
source
val yourConfiguration = ConfigFactory.defaultApplication() val system = ActorSystem( ??? /*some behavior*/, "test-system", PersistenceTestKitSnapshotPlugin.config.withFallback(yourConfiguration)) val testKit = SnapshotTestKit(system) - Java
A typical scenario is to create a persistent actor, send commands to it and check that it persists events as it is expected:
- Scala
-
source
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.scaladsl.PersistenceTestKit class PersistenceTestKitSampleSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() } "Persistent actor" should { "persist all events" in { val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) val cmd = Cmd("data") persistentActor ! cmd val expectedPersistedEvent = Evt(cmd.data) persistenceTestKit.expectNextPersisted(persistenceId.id, expectedPersistedEvent) } } }
- Java
You can safely use persistence testkit in combination with main akka testkit.
The main methods of the api allow to (see PersistenceTestKit
and SnapshotTestKit
for more details):
- check if the given event/snapshot object is the next persisted in the storage.
- read a sequence of persisted events/snapshots.
- check that no events/snapshots have been persisted in the storage.
- throw the default exception from the storage on attempt to persist, read or delete the following event/snapshot.
- clear the events/snapshots persisted in the storage.
- reject the events, but not snapshots (rejections are not supported for snapshots in the original api).
- set your own policy which emulates the work of the storage. Policy determines what to do when persistence needs to execute some operation on the storage (i.e. read, delete, etc.).
- get all the events/snapshots persisted in the storage
- put the events/snapshots in the storage to test recovery
Setting your own policy for the storage
You can implement and set your own policy for the storage to control its actions on particular operations, for example you can fail or reject events on your own conditions. Implement the ProcessingPolicy[EventStorage.JournalOperation]
trait for event storage or ProcessingPolicy[SnapshotStorage.SnapshotOperation]
trait for snapshot storage, and set it with withPolicy()
method.
- Scala
-
source
class PersistenceTestKitSampleSpecWithPolicy extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config.withFallback(ConfigFactory.defaultApplication())) with AnyWordSpecLike with BeforeAndAfterEach { val persistenceTestKit = PersistenceTestKit(system) override def beforeEach(): Unit = { persistenceTestKit.clearAll() persistenceTestKit.resetPolicy() } "Testkit policy" should { "fail all operations with custom exception" in { val policy = new EventStorage.JournalPolicies.PolicyType { class CustomFailure extends RuntimeException override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = processingUnit match { case WriteEvents(_) => StorageFailure(new CustomFailure) case _ => ProcessingSuccess } } persistenceTestKit.withPolicy(policy) val persistenceId = PersistenceId.ofUniqueId("your-persistence-id") val persistentActor = spawn( EventSourcedBehavior[Cmd, Evt, State]( persistenceId, emptyState = State.empty, commandHandler = (_, cmd) => Effect.persist(Evt(cmd.data)), eventHandler = (state, evt) => state.updated(evt))) persistentActor ! Cmd("data") persistenceTestKit.expectNothingPersisted(persistenceId.id) } } }
- Java
tryProcess()
method of the ProcessingPolicy
has two arguments: persistence id and the storage operation.
Event storage has the following operations:
ReadEvents
Read the events from the storage.WriteEvents
Write the events to the storage.DeleteEvents
Delete the events from the storage.ReadSeqNum
Read the highest sequence number for particular persistence id.
Snapshot storage has the following operations:
ReadSnapshot
Read the snapshot from the storage.WriteSnapshot
Writhe the snapshot to the storage.DeleteSnapshotsByCriteria
Delete snapshots in the storage by criteria.DeleteSnapshotByMeta
Delete particular snapshot from the storage by its metadata.
The tryProcess()
method must return one of the processing results:
ProcessingSuccess
Successful completion of the operation. All the events will be saved/read/deleted.StorageFailure
Emulates exception from the storage.Reject
Emulates rejection from the storage.
Note that snapshot storage does not have rejections. If you return Reject
in the tryProcess()
of the snapshot storage policy, it will have the same effect as the StorageFailure
.
Here is an example of the policy for an event storage:
- Scala
-
source
import akka.persistence.testkit._ class SampleEventStoragePolicy extends EventStorage.JournalPolicies.PolicyType { //you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: JournalOperation): ProcessingResult = if (count < 10) { count += 1 //check the type of operation and react with success or with reject or with failure. //if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadEvents(batch) if batch.nonEmpty => ProcessingSuccess case WriteEvents(batch) if batch.size > 1 => ProcessingSuccess case ReadSeqNum => StorageFailure() case DeleteEvents(_) => Reject() case _ => StorageFailure() } } else { ProcessingSuccess } }
- Java
Here is an example of the policy for a snapshot storage:
- Scala
-
source
class SampleSnapshotStoragePolicy extends SnapshotStorage.SnapshotPolicies.PolicyType { //you can use internal state, it does not need to be thread safe var count = 1 override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = if (count < 10) { count += 1 //check the type of operation and react with success or with reject or with failure. //if you return ProcessingSuccess the operation will be performed, otherwise not. processingUnit match { case ReadSnapshot(_, payload) if payload.nonEmpty => ProcessingSuccess case WriteSnapshot(meta, payload) if meta.sequenceNr > 10 => ProcessingSuccess case DeleteSnapshotsByCriteria(_) => StorageFailure() case DeleteSnapshotByMeta(meta) if meta.sequenceNr < 10 => ProcessingSuccess case _ => StorageFailure() } } else { ProcessingSuccess } }
- Java
Configuration of Persistence TestKit
There are several configuration properties for persistence testkit, please refer to the reference configuration
Integration testing
EventSourcedBehavior
actors can be tested with the ActorTestKit together with other actors. The in-memory journal and snapshot storage from the Persistence TestKit can be used also for integration style testing of a single ActorSystem
, for example when using Cluster Sharding with a single Cluster node.
For tests that involve more than one Cluster node you have to use another journal and snapshot store. While it’s possible to use the Persistence Plugin Proxy it’s often better and more realistic to use a real database.
The CQRS example includes tests that are using Akka Persistence Cassandra.
Plugin initialization
Some Persistence plugins create tables automatically, but has the limitation that it can’t be done concurrently from several ActorSystems. That can be a problem if the test creates a Cluster and all nodes tries to initialize the plugins at the same time. To coordinate initialization you can use the PersistenceInit
utility.
PersistenceInit
is part of akka-persistence-testkit
and you need to add the dependency to your project:
- sbt
val AkkaVersion = "2.6.21" libraryDependencies += "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion
- Maven
- Gradle
- Scala
-
source
import akka.persistence.testkit.scaladsl.PersistenceInit import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ val timeout = 5.seconds val done: Future[Done] = PersistenceInit.initializeDefaultPlugins(system, timeout) Await.result(done, timeout)
- Java