Testing
The akka-persistence-multi-dc-testkit
module provides a Simulator
for unit testing replicated entities without using a database. Fine-grained control is available for inspecting side effects and simulating various scenario’s and replication orderings.
For more realistic but more heavy ‘integration test’-style tests you might want to consider writing a Multi JVM Test in addition to simulator-based tests.
Dependency
To use the simulator and the Cassandra launcher described below you will need the akka-persistence-multi-dc-testkit
dependency:
- sbt
-
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html "com.lightbend.akka" %% "akka-persistence-multi-dc-testkit" % "1.1.16" % Test
- Gradle
-
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html dependencies { testCompile group: 'com.lightbend.akka', name: 'akka-persistence-multi-dc-testkit_2.11', version: '1.1.16' }
- Maven
-
<!-- Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html --> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-persistence-multi-dc-testkit_2.11</artifactId> <version>1.1.16</version> <scope>test</scope> </dependency>
Before you can access this library, you’ll need to configure the Lightbend repository and credentials in your build.
Basic simulator usage
A simple use of the simulator might look like this:
- Scala
-
import BuggyEntity._ val simulator = createSimulator(() => new BuggyEntity(), selfDc = allDcs.head, allDcs) simulator.runCommand(Append("Hello")) simulator.replicatedEvent(Appended(" World")) simulator.runCommand(Append("foo"), Append("bar")) simulator.state() should ===("Hello Worldfoobar") simulator.events().length should ===(4)
- Java
-
Simulator<Commands.Command, Events.Event, String> simulator = createSimulator(() -> new BuggyEntity()); simulator.runCommand(new Commands.Append("Hello")); simulator.replicatedEvent(new Events.Appended(" World")); simulator.runCommand(new Commands.Append("foo"), new Commands.Append("bar")); assertEquals("Hello Worldfoobar", simulator.state()); assertEquals(4, simulator.getEvents().size());
The createSimulator
helper method is a convenience method that makes it easy to create a Simulator
under a mostly-empty ActorSystem
and makes sure its lifecycle is correctly tied into your test framework of choice. For example, under scalatestjunit the complete test might look like:
- Scala
-
import scala.concurrent.{ Future, Promise } import akka.actor.ActorRef import akka.persistence.multidc.scaladsl.{ Effect, EffectFactories, ReplicatedEntity } import org.scalatest.{ Matchers, WordSpecLike } class SimulatorSpec extends WordSpecLike with ScalatestSimulator with Matchers { import SimulatorSpec._ val allDcs = Set("DC-A", "DC-B") "The simulator" should { "allow writing a test controlling replication manually" in { import BuggyEntity._ val simulator = createSimulator(() => new BuggyEntity(), selfDc = allDcs.head, allDcs) simulator.runCommand(Append("Hello")) simulator.replicatedEvent(Appended(" World")) simulator.runCommand(Append("foo"), Append("bar")) simulator.state() should ===("Hello Worldfoobar") simulator.events().length should ===(4) } } }
- Java
-
import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; public class BasicTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testBasicSimulator() { Simulator<Commands.Command, Events.Event, String> simulator = createSimulator(() -> new BuggyEntity()); simulator.runCommand(new Commands.Append("Hello")); simulator.replicatedEvent(new Events.Appended(" World")); simulator.runCommand(new Commands.Append("foo"), new Commands.Append("bar")); assertEquals("Hello Worldfoobar", simulator.state()); assertEquals(4, simulator.getEvents().size()); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
For the sake of completeness, the entity under test looks like:
- Scala
-
/** * This entity is buggy because replica's will end up in different states when events are persisted * concurrently in different data centres. */ object BuggyEntity { sealed trait Command case class Append(string: String) extends Command case class Get(replyTo: ActorRef) extends Command sealed trait Event case class Appended(string: String) extends Event type State = String } class BuggyEntity extends ReplicatedEntity[BuggyEntity.Command, BuggyEntity.Event, BuggyEntity.State] { import BuggyEntity._ override val initialState = "" override def commandHandler = CommandHandler { (ctx, state, cmd) => cmd match { case Get(replyTo) => replyTo ! state Effect.none case Append(string) => Effect.persist(Appended(string)) } } override def eventHandler(state: State, event: Event): State = event match { case Appended(string) => state + string } }
- Java
-
import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; /** * This entity is buggy because replica's will end up in different states when events are persisted * concurrently in different data centres. */ public class BuggyEntity extends ReplicatedEntity<Commands.Command, Events.Event, String> { @Override public String initialState() { return ""; } @Override public CommandHandler<Commands.Command, Events.Event, String> commandHandler() { return commandHandlerBuilder(Commands.Command.class) .matchCommand(Commands.Get.class, (ctx, state, get) -> { ctx.getSender().tell(state, ctx.getSelf()); return Effect().none(); }) .matchCommand(Commands.Append.class, (ctx, state, append) -> { return Effect().persist(new Events.Appended(append.string)); }) .build(); } @Override public EventHandler<Events.Event, String> eventHandler() { return eventHandlerBuilder(Events.Event.class) .matchEvent(Events.Appended.class, (state, appended) -> { return state + appended.string; }) .build(); } }
Testing side effects
To demonstrate some more advanced testing scenario’s, we introduce a ReliableDeliverer
actor implementation which we will test. This actor implements an “at-least-once” delivery: after accepting a task and persisting the intent of performing some async action, we immediately send a DeliveryScheduled
acknowledgement to the sender. Then we attempt to perform some async call, retrying in case of restarts, sending a final DeliveryAcknowledged
at a best-effort basis. Typically you might also want to trigger retries with a Timer
, but this has been left out of this example for brevity.
The actor under test looks like this:
- Scala
-
object ReliableDeliverer { sealed trait Command case class Deliver(payload: String) extends Command sealed abstract class InternalCommand extends Command case class ActionPerformed(payload: String) extends InternalCommand case object DeliveryScheduled case object DeliveryAcknowledged sealed trait Event case class Promised(payload: String) extends Event case class Acknowledged(payload: String) extends Event case class State(pendingPayloads: List[String]) } class ReliableDeliverer(useThisDcDuringRecovery: Boolean = true) extends ReplicatedEntity[ReliableDeliverer.Command, ReliableDeliverer.Event, ReliableDeliverer.State] { import akka.pattern.pipe import ReliableDeliverer._ // Transient mutable state (not part of the state managed by event sourcing) // which means it will be lost when the actor is restarted, and delivery acknowledgements // might go missing even though the delivery is in fact retried. var acknowledgementsTo: Map[String, ActorRef] = Map.empty override val initialState = State(List.empty) override def commandHandler = CommandHandler { (ctx, state, cmd) => cmd match { case Deliver(payload) => import ctx.dispatcher acknowledgementsTo = acknowledgementsTo.updated(payload, ctx.sender()) Effect.persist(Promised(payload)) .andThen(_ => performAsyncCall(payload).pipeTo(ctx.self)) .andThen(_ => ctx.sender() ! DeliveryScheduled) case ActionPerformed(payload) => Effect.persist(Acknowledged(payload)) .andThen { _ => acknowledgementsTo.get(payload).foreach(_ ! DeliveryAcknowledged) acknowledgementsTo = acknowledgementsTo - payload } } } override def eventHandler(state: State, event: Event): State = event match { case Promised(payload) => state.copy(state.pendingPayloads :+ payload) case Acknowledged(payload) => state.copy(state.pendingPayloads.filter(_ != payload)) } override def recoveryCompleted(ctx: ActorContext, state: State): Effect[Event, State] = { if (useThisDcDuringRecovery) { import ctx.dispatcher state.pendingPayloads.foreach(payload => performAsyncCall(payload).pipeTo(ctx.self)) } Effect.none } def performAsyncCall(payload: String): Future[ActionPerformed] = ??? }
- Java
-
package akka.persistence.multidc.testkit.reliabledeliverer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import akka.Done; import akka.actor.ActorRef; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.Effect; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; import static akka.pattern.PatternsCS.pipe; public class ReliableDeliverer extends ReplicatedEntity<Commands.Command, Events.Event, List<String>> { private boolean useThisDcDuringRecovery; // Transient mutable state (not part of the state managed by event sourcing) private Map<String, ActorRef> acknowledgementsTo = new HashMap<>(); public ReliableDeliverer(boolean useThisDcDuringRecovery) { this.useThisDcDuringRecovery = useThisDcDuringRecovery; } @Override public List<String> initialState() { return new ArrayList<>(); } @Override public CommandHandler<Commands.Command, Events.Event, List<String>> commandHandler() { return commandHandlerBuilder(Commands.Command.class) .matchCommand(Commands.Deliver.class, (ctx, state, deliver) -> { acknowledgementsTo.put(deliver.payload, ctx.getSender()); return Effect().persist(new Events.Promised(deliver.payload)) .andThen(s -> pipe(performAsyncCall(deliver.payload), ctx.getDispatcher()).to(ctx.getSelf())) .andThen(s -> ctx.getSender().tell(Messages.DeliveryScheduled.getInstance(), ctx.getSelf())); }) .matchCommand(Commands.ActionPerformed.class, (ctx, state, actionPerformed) -> { return Effect().persist(new Events.Acknowledged(actionPerformed.payload)) .andThen(s -> { ActorRef sender = acknowledgementsTo.get(actionPerformed.payload); if (sender != null) { sender.tell(Messages.DeliveryAcknowledged.getInstance(), ActorRef.noSender()); acknowledgementsTo.remove(actionPerformed.payload); } }); }) .build(); } @Override public EventHandler<Events.Event, List<String>> eventHandler() { return eventHandlerBuilder(Events.Event.class) .matchEvent(Events.Promised.class, (state, event) -> { List<String> newState = new ArrayList<>(state); newState.add(event.payload); return newState; }) .matchEvent(Events.Acknowledged.class, (state, event) -> { List<String> newState = new ArrayList<>(state); newState.remove(event.payload); return newState; }) .build(); } @Override public Effect<Events.Event, List<String>> recoveryCompleted(ActorContext ctx, List<String> state) { if (useThisDcDuringRecovery) { for (String payload : state) { pipe(performAsyncCall(payload), ctx.getDispatcher()).to(ctx.getSelf()); } } return Effect().none(); } CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) { throw new UnsupportedOperationException(); } }
Testing that it indeed sends both a DeliveryScheduled
and a DeliveryAcknowledged
back to the sender for the “happy path” scenario can be demonstrated with the following test:
- Scala
-
import ReliableDeliverer._ import akka.testkit.CallingThreadDispatcher val simulator = createSimulator(() => new ReliableDeliverer() { override def performAsyncCall(payload: String): Future[ActionPerformed] = Future.successful(ActionPerformed(payload)) }, selfDc = allDcs.head, allDcs, Some(CallingThreadDispatcher.Id)) simulator.runCommand(Deliver("Some Payload")) simulator.responses shouldBe Seq(DeliveryScheduled) simulator.selfMessages shouldBe Seq(ActionPerformed("Some Payload")) simulator.runCommand(ActionPerformed("Some Payload")) simulator.responses shouldBe Seq(DeliveryScheduled, DeliveryAcknowledged)
- Java
-
package akka.persistence.multidc.testkit.reliabledeliverer; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import akka.Done; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.persistence.multidc.testkit.reliabledeliverer.ReliableDeliverer; import akka.persistence.multidc.testkit.reliabledeliverer.Commands; import akka.persistence.multidc.testkit.reliabledeliverer.Events; import akka.persistence.multidc.testkit.reliabledeliverer.Messages; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import static org.junit.Assert.assertTrue; public class SenderTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testObserveResponseToSender() { Simulator<Commands.Command, Events.Event, List<String>> simulator = createSimulator(() -> new ReliableDeliverer(true) { @Override CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) { return CompletableFuture.completedFuture(new Commands.ActionPerformed(payload)); } }); simulator.runCommand(new Commands.Deliver("Some Payload")); assertTrue(simulator.getResponses().contains(Messages.DeliveryScheduled.getInstance())); assertTrue(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload"))); simulator.runCommand(new Commands.ActionPerformed("Some Payload")); assertTrue(simulator.getResponses().contains(Messages.DeliveryAcknowledged.getInstance())); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Testing recovery
Of course it is important to test the logic of your replicated entity is also consistent after recovery. The simulator provides a convenient .restart()
method to start a new instance of the replicated entity and apply the recovery logic. Do note that this does not actually restart the actor system used for hosting the tests.
To demonstrate testing recovery, we can for example show the recovery behavior of the ReliableDeliverer
actor introduced in the previous section:
- Scala
-
import ReliableDeliverer._ var futureToReturn = Promise[ActionPerformed]().future val simulator = createSimulator(() => new ReliableDeliverer() { override def performAsyncCall(payload: String): Future[ActionPerformed] = futureToReturn }, selfDc = allDcs.head, allDcs, Some(CallingThreadDispatcher.Id)) simulator.runCommand(Deliver("Some Payload")) simulator.responses shouldBe Seq(DeliveryScheduled) futureToReturn = Future.successful(ActionPerformed("Some Payload")) simulator.restart() simulator.selfMessages shouldBe Seq(ActionPerformed("Some Payload"))
- Java
-
package akka.persistence.multidc.testkit.reliabledeliverer; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.persistence.multidc.testkit.reliabledeliverer.ReliableDeliverer; import akka.persistence.multidc.testkit.reliabledeliverer.Commands; import akka.persistence.multidc.testkit.reliabledeliverer.Events; import akka.persistence.multidc.testkit.reliabledeliverer.Messages; import akka.testkit.CallingThreadDispatcher; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; public class RecoveryTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor, String dispatcherId) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.of(dispatcherId)); } @Test public void testObserveResponseToSender() { final Holder<CompletableFuture<Commands.ActionPerformed>> futureToReturn = new Holder<>(new CompletableFuture<>()); Simulator<Commands.Command, Events.Event, List<String>> simulator = createSimulator(() -> new ReliableDeliverer(true) { @Override CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) { System.out.println("Returning " + futureToReturn.value); return futureToReturn.value; } }, CallingThreadDispatcher.Id()); simulator.runCommand(new Commands.Deliver("Some Payload")); assertTrue(simulator.getResponses().contains(Messages.DeliveryScheduled.getInstance())); assertFalse(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload"))); futureToReturn.value = CompletableFuture.completedFuture(new Commands.ActionPerformed("Some Payload")); simulator.restart(); assertTrue(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload"))); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Snapshotting
When snapshots are used to make recovery more efficient:
- Scala
-
object Snapshotting { sealed trait Command case class Append(string: String) extends Command case object Get extends Command case object EventsHandled extends Command sealed trait Event case class Appended(string: String) extends Event type State = String } class Snapshotting extends ReplicatedEntity[Snapshotting.Command, Snapshotting.Event, Snapshotting.State] { import Snapshotting._ override val initialState = "" // Sneaky unmanaged state: var eventsHandled = 0 override def commandHandler = CommandHandler { (ctx, state, cmd) => cmd match { case Get => ctx.sender() ! state Effect.none case EventsHandled => ctx.sender() ! eventsHandled Effect.none case Append(string) => Effect.persist(Appended(string)) } } override def eventHandler(state: State, event: Event): State = event match { case Appended(string) => eventsHandled = eventsHandled + 1 state + string } }
- Java
-
package akka.persistence.multidc.testkit.snapshotting; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; public class Snapshotting extends ReplicatedEntity<Commands.Command, Events.Appended, String> { // Sneaky unmanaged state: private Integer eventsHandled = 0; @Override public String initialState() { return ""; } @Override public CommandHandler<Commands.Command, Events.Appended, String> commandHandler() { return commandHandlerBuilder(Commands.Command.class) .matchCommand(Commands.Get.class, (ctx, state, get) -> { ctx.getSender().tell(state, ctx.getSelf()); return Effect().none(); }) .matchCommand(Commands.EventsHandled.class, (ctx, state, eventsHandled) -> { ctx.getSender().tell(this.eventsHandled, ctx.getSelf()); return Effect().none(); }) .matchCommand(Commands.Append.class, (ctx, state, append) -> { return Effect().persist(new Events.Appended(append.string)); }) .build(); } @Override public EventHandler<Events.Appended, String> eventHandler() { return eventHandlerBuilder(Events.Appended.class) .matchEvent(Events.Appended.class, (state, appended) -> { eventsHandled++; return state + appended.string; }) .build(); } }
The simulator correctly takes this into account and replays the events starting from the point of the snapshot:
- Scala
-
import Snapshotting._ val simulator = createSimulator(() => new Snapshotting(), selfDc = allDcs.head, allDcs) simulator.runCommand(Append("First")) simulator.takeSnapshot() simulator.runCommand(Append("Second")) simulator.runCommand(Get) simulator.runCommand(EventsHandled) simulator.responses() shouldBe Seq("FirstSecond", 2) simulator.restart() simulator.runCommand(Get) simulator.runCommand(EventsHandled) // TODO perhaps reading the responses should clear them as a side effect, like expectMessage etc simulator.responses() shouldBe Seq("FirstSecond", 2, "FirstSecond", 1)
- Java
-
package akka.persistence.multidc.testkit.snapshotting; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import static org.junit.Assert.assertTrue; public class SnapshottingTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testSnapshotting() { Simulator<Commands.Command, Events.Appended, String> simulator = createSimulator(() -> new Snapshotting()); simulator.runCommand(new Commands.Append("First")); simulator.takeSnapshot(); simulator.runCommand(new Commands.Append("Second")); simulator.runCommand(new Commands.Get()); assertTrue(simulator.responses().contains("FirstSecond")); simulator.runCommand(new Commands.EventsHandled()); assertTrue(simulator.responses().contains(2)); simulator.clearResponses(); simulator.restart(); simulator.runCommand(new Commands.Get()); assertTrue(simulator.responses().contains("FirstSecond")); simulator.runCommand(new Commands.EventsHandled()); assertTrue(simulator.responses().contains(1)); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Timeouts and Timers
In the following example we have a test that shuts itself down when its receive timeout has been triggered:
- Scala
-
class StopWhenIdle extends ReplicatedEntity[Unit, Unit, Unit] { override def initialState: Unit = () override def commandHandler = CommandHandler { (_, _, _) => Effect.unhandled }.onReceiveTimeout { case (ctx, state) => Effect.stop } override def eventHandler(state: Unit, event: Unit): Unit = () }
- Java
-
package akka.persistence.multidc.testkit.stopwhenidle; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; public class StopWhenIdle extends ReplicatedEntity<Void, Void, Void> { @Override public Void initialState() { return null; } @Override public CommandHandler<Void, Void, Void> commandHandler() { return commandHandlerBuilder(Void.class) .onReceiveTimeout((ctx, state, rt) -> Effect().stop()) .build(); } @Override public EventHandler<Void, Void> eventHandler() { return null; } }
In its test, a receive timeout can be explicitly triggered:
- Scala
-
val simulator = createSimulator(() => new StopWhenIdle(), selfDc = allDcs.head, allDcs) simulator.triggerReceiveTimeout() simulator.stopped() shouldBe true
- Java
-
package akka.persistence.multidc.testkit.stopwhenidle; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import static org.junit.Assert.assertTrue; public class ReceiveTimeoutTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testReceiveTimeout() { Simulator simulator = createSimulator(() -> new StopWhenIdle()); simulator.triggerReceiveTimeout(); assertTrue(simulator.stopped()); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
When a replicated entity uses the powerful Timers API:
- Scala
-
object Ticking { case object TickKey sealed trait Command case object FirstTick extends Command case object Tick } class Ticking extends ReplicatedEntity[Ticking.Command, Int, Int] { import scala.concurrent.duration._ import Ticking._ override val initialState = 0 override def commandHandler = CommandHandler { (ctx, state, cmd) => cmd match { case FirstTick => ctx.timers.startPeriodicTimer(TickKey, Tick, 1.second) Effect.none } }.onTimer[Tick.type] { (ctx, state, tick) => tick match { case Tick => Effect.persist(1) } } override def eventHandler(state: Int, event: Int): Int = state + event }
- Java
-
package akka.persistence.multidc.testkit.timers; import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.timers.Tick; import akka.persistence.multidc.testkit.timers.FirstTick; public class Ticking extends ReplicatedEntity<FirstTick, Integer, Integer> { public static final Object TICK_KEY = "TickKey"; @Override public Integer initialState() { return 0; } @Override public CommandHandler<FirstTick, Integer, Integer> commandHandler() { return commandHandlerBuilder(FirstTick.class) .matchCommand(FirstTick.class, (ctx, state, firstTick) -> { // FIXME Java API for setting timers, not taking a FiniteDuration ctx.getTimers().startPeriodicTimer(TICK_KEY, Tick.getInstance(), Duration.create(1, TimeUnit.SECONDS)); return Effect().none(); }) .onTimer(Tick.class, (ctx, state, tick) -> { return Effect().persist(1); }) .build(); } @Override public EventHandler<Integer, Integer> eventHandler() { return eventHandlerBuilder(Integer.class) .matchAny((state, event) -> state + event); } }
The simulator API allows timers to be inspected and triggered:
- Scala
-
import Ticking._ val simulator = createSimulator(() => new Ticking(), selfDc = allDcs.head, allDcs) simulator.runCommand(FirstTick) simulator.isTimerActive(TickKey) shouldBe true simulator.triggerTimer(TickKey) simulator.isTimerActive(TickKey) shouldBe true simulator.state shouldBe 1
- Java
-
package akka.persistence.multidc.testkit.timers; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.timers.Tick; import akka.persistence.multidc.testkit.timers.FirstTick; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.Simulator; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; public class TimerTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testTimers() { Simulator<FirstTick, Integer, Integer> simulator = createSimulator(() -> new Ticking()); simulator.runCommand(FirstTick.getInstance()); assertTrue(simulator.isTimerActive(Ticking.TICK_KEY)); simulator.triggerTimer(Ticking.TICK_KEY); assertTrue(simulator.isTimerActive(Ticking.TICK_KEY)); assertEquals(Integer.valueOf(1), simulator.state()); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Integration testing
It is also possible to test your replicated entity using an integration test. Such a test starts a number of actor systems, and provides some tools to simulate failures. These would typically connect to a real cassandra instance, making them more faithful but also rather heavy.
To make it easier to spawn a temporary Cassandra instance for testing akka-persistence-multi-dc-testkit
provides a CassandraLifecycle
utility. This can be relatively easily hooked into your test framework, for example for scalatest your could use a traitJUnit you could initialize it like this:
- Scala
-
import akka.testkit.TestKitBase import akka.persistence.cassandra.testkit.CassandraLauncher import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite trait CassandraLifecycleScalatest extends BeforeAndAfterAll { this: TestKitBase with Suite => import CassandraLifecycle._ def systemName: String def cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource override protected def beforeAll(): Unit = { startCassandra(systemName, cassandraConfigResource) awaitPersistenceInit(system) super.beforeAll() } override protected def afterAll(): Unit = { shutdown(system, verifySystemShutdown = true) stopCassandra() super.afterAll() } }
- Java
-
import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.CassandraLifecycle; import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; public class AuctionExampleTest extends JUnitSuite { private static ActorSystem system; private static ActorSystem otherSystem; @BeforeClass public static void setupSystems() { system = createSystem("DC-A"); CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource()); CassandraLifecycle.awaitPersistenceInit(system); otherSystem = createSystem("DC-B"); CassandraLifecycle.awaitPersistenceInit(otherSystem); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); TestKit.shutdownActorSystem(otherSystem, true); CassandraLifecycle.stopCassandra(); } }
Combined with PersistenceMultiDcTestKit
you can write tests that verify the behavior when replication between systems is temporarily suspended. You will need to add persistenceMultiDcTestSettings
to your actor system configuration.
- Scala
-
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit._ // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(persistenceMultiDcTestSettings("AuctionExampleSpec", dc, allDcs))) "correctly resolve concurrent writes when replication is temporarily suspended" in new TestSetup("test6") { disableReplication(otherSystem, system) nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) expectHighestBid(nodeA, "Mary", 12) enableReplication(otherSystem, system) awaitAssert { expectHighestBid(nodeA, "Paul", 42) } }
- Java
-
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B")))); @Test public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception { new TestSetup("test6") { { PersistenceMultiDcTestKit.disableReplication(otherSystem, system); nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); PersistenceMultiDcTestKit.enableReplication(otherSystem, system); awaitAssert(() -> { expectHighestBid(nodeA, "Paul", 42); return null; }); } }; }
A full example, testing the auction actor described here, might look like this:
- Scala
-
import java.time.Instant import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.{ ActorRef, ActorSystem, Props } import akka.persistence.multidc.PersistenceMultiDcSettings import org.scalatest.BeforeAndAfterAll import org.scalatest.Matchers import org.scalatest.WordSpecLike import org.scalatest.BeforeAndAfter import akka.persistence.multidc.testkit._ import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit._ import akka.testkit.ImplicitSender import akka.testkit.TestKit object AuctionExampleSpec { val allDcs = Seq("DC-A", "DC-B") val clusterConfig = ConfigFactory.parseString(""" akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 akka.remote.classic.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 akka.cluster.jmx.multi-mbeans-in-same-jvm = on # avoid eager init in tests akka.persistence.multi-data-center.hot-standby.enabled = off # FIXME when using Akka 2.6 we should use Jackson or JavaSerializable akka.actor.allow-java-serialization = on akka.actor.warn-about-java-serializer-usage = off # speed up joining and such akka.cluster.gossip-interval = 500 ms """) def createSystem(dc: String = allDcs.head) = ActorSystem( "AuctionExampleSpec", clusterConfig .withFallback(CassandraLifecycle.config) // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(persistenceMultiDcTestSettings("AuctionExampleSpec", dc, allDcs))) } class AuctionExampleSpec extends TestKit(AuctionExampleSpec.createSystem()) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfter with CassandraLifecycleScalatest { import AuctionExampleSpec._ val systemName = "AuctionExampleSpec" val settings = PersistenceMultiDcSettings(system) val otherSystem = createSystem(allDcs(1)) val otherSettings = PersistenceMultiDcSettings(otherSystem) val allSystems = Seq(system, otherSystem) class TestSetup(testName: String) { val minimumBid = 12 val auctionSetupA = AuctionSetup("bicycle", Bid("me", minimumBid, Instant.now(), ""), closingAt = Instant.now().plusSeconds(60), responsibleForClosing = true) val auctionSetupB = auctionSetupA.copy(responsibleForClosing = false) val nodeA = system.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupA, settings), s"auction-A-$testName") val nodeB = otherSystem.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupB, otherSettings), s"auction-B-$testName") def expectHighestBid(node: ActorRef): Bid = { node ! GetHighestBid expectMsgType[Bid] } def expectHighestBid(node: ActorRef, bidder: String, expected: MoneyAmount): Unit = { val bid = expectHighestBid(node) bid.offer shouldEqual expected bid.bidder shouldEqual bidder } def expectClosed(node: ActorRef): Unit = { node ! IsClosed expectMsg(true) } } "AuctionExample" should { "propagate highest bid to replicated actor" in new TestSetup("test1") { // simple bidding nodeA ! OfferBid("Mary", 42) expectHighestBid(nodeA, "Mary", minimumBid) // ebay style, still the minimum offer nodeA ! OfferBid("Paul", 41) expectHighestBid(nodeA, "Mary", 41) // ebay style, now updated to the highest counter offer awaitAssert { // check that results have propagated to b expectHighestBid(nodeB, "Mary", 41) // ebay style, now updated to the highest counter offer } // make sure that first bidder still keeps the crown nodeB ! OfferBid("c", 42) expectHighestBid(nodeB, "Mary", 42) } "eventually resolve conflicting bids during auction if bids are highest (but different) in each dc" in new TestSetup("test2") { // highest bid comes first nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 41) awaitAssert { expectHighestBid(nodeA, "Mary", 41) expectHighestBid(nodeB, "Mary", 41) } // highest bid comes second nodeA ! OfferBid("Paul", 50) nodeB ! OfferBid("Kat", 60) awaitAssert { expectHighestBid(nodeA, "Kat", 50) expectHighestBid(nodeB, "Kat", 50) } } "eventually resolve conflicting bids during auction if bids are highest and equal (but different time) in each dc" in new TestSetup("test3") { nodeA ! OfferBid("Mary", 15) expectHighestBid(nodeA, "Mary", 12) nodeB ! OfferBid("Paul", 15) awaitAssert { expectHighestBid(nodeA, "Mary", 15) expectHighestBid(nodeB, "Mary", 15) } } "eventually come to a consistent final result" in new TestSetup("test4") { nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) Thread.sleep(3000) // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeA ! Finish awaitAssert { expectClosed(nodeA) expectClosed(nodeB) } expectHighestBid(nodeA, "Paul", 43) expectHighestBid(nodeB, "Paul", 43) } "eventually come to a consistent final result when finishing is initiated on node B" in new TestSetup("test5") { nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) Thread.sleep(3000) // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeB ! Finish awaitAssert { expectClosed(nodeA) expectClosed(nodeB) } expectHighestBid(nodeA, "Paul", 43) expectHighestBid(nodeB, "Paul", 43) } "correctly resolve concurrent writes when replication is temporarily suspended" in new TestSetup("test6") { disableReplication(otherSystem, system) nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) expectHighestBid(nodeA, "Mary", 12) enableReplication(otherSystem, system) awaitAssert { expectHighestBid(nodeA, "Paul", 42) } } } after { allSystems.foreach { sys => enableAllReplicationTo(sys) } } override protected def afterAll(): Unit = { allSystems.foreach(sys => shutdown(sys)) super.afterAll() } }
- Java
-
import java.io.Serializable; import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.JAPI; import akka.persistence.cassandra.testkit.CassandraLauncher; import akka.persistence.multidc.PersistenceMultiDcSettings; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.CassandraLifecycle; import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; import scala.concurrent.duration.Duration; import static org.junit.Assert.assertEquals; public class AuctionExampleTest extends JUnitSuite { static class Bid implements Serializable { final String bidder; final int offer; final Instant timestamp; final String originDc; Bid(String bidder, int offer, Instant timestamp, String originDc) { this.bidder = bidder; this.offer = offer; this.timestamp = timestamp; this.originDc = originDc; } Bid withOffer(int offer) { return new Bid(bidder, offer, timestamp, originDc); } } // commands interface AuctionCommand extends Serializable {} static class OfferBid implements AuctionCommand { final String bidder; final int offer; public OfferBid(String bidder, int offer) { this.bidder = bidder; this.offer = offer; } } // An auction coordinator needs to schedule this event to each replica static class Finish implements AuctionCommand { static final Finish INSTANCE = new Finish(); private Finish() {} } static class GetHighestBid implements AuctionCommand { static final GetHighestBid INSTANCE = new GetHighestBid(); private GetHighestBid() {} } static class IsClosed implements AuctionCommand { static final IsClosed INSTANCE = new IsClosed(); private IsClosed() {} } // Internal, should not be sent from the outside private static class Close implements AuctionCommand { static final Close INSTANCE = new Close(); private Close() {} } // events interface AuctionEvent extends Serializable {} static class BidRegistered implements AuctionEvent { final Bid bid; public BidRegistered(Bid bid) { this.bid = bid; } } static class AuctionFinished implements AuctionEvent { final String atDc; public AuctionFinished(String atDc) { this.atDc = atDc; } } static class WinnerDecided implements AuctionEvent { final String atDc; final Bid winningBid; final int highestCounterOffer; public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) { this.atDc = atDc; this.winningBid = winningBid; this.highestCounterOffer = highestCounterOffer; } } static class AuctionState { final boolean stillRunning; final Bid highestBid; // in ebay style auctions, we need to keep track of current highest counter offer final int highestCounterOffer; final Set<String> finishedAtDc; AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) { this.stillRunning = stillRunning; this.highestBid = highestBid; this.highestCounterOffer = highestCounterOffer; this.finishedAtDc = finishedAtDc; } AuctionState withNewHighestBid(Bid bid) { assert(stillRunning); assert(isHigherBid(bid, highestBid)); return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around } AuctionState withTooLowBid(Bid bid) { assert(stillRunning); assert(isHigherBid(highestBid, bid)); return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc); } static Boolean isHigherBid(Bid first, Bid second) { return first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0); } AuctionState addFinishedAtDc(String dc) { Set<String> s = new HashSet<>(finishedAtDc); s.add(dc); return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s)); } public AuctionState close() { return new AuctionState( false, highestBid, highestCounterOffer, Collections.emptySet() ); } public boolean isClosed() { return !stillRunning && finishedAtDc.isEmpty(); } } private static final String FINISH_TIMER = "FinishTimer"; static class AuctionSetup { final String name; final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner final Instant closingAt; final boolean responsibleForClosing; public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) { this.name = name; this.initialBid = initialBid; this.closingAt = closingAt; this.responsibleForClosing = responsibleForClosing; } } static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> { final AuctionSetup auctionSetup; public AuctionEntity(AuctionSetup auctionSetup) { this.auctionSetup = auctionSetup; } @Override public AuctionState initialState() { return new AuctionState( true, auctionSetup.initialBid, auctionSetup.initialBid.offer, Collections.emptySet()); } @Override public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() { return byStateCommandHandlerBuilder(AuctionState.class) .matchState(state -> state.stillRunning, running()) .matchAny(finished()); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc()))); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().warning("Premature close"); // Close should only be triggered when we have already finished return Effect().unhandled(); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(false, getSelf()); return Effect().none(); }).build(); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { // auction finished, no more bids accepted return Effect().unhandled(); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid, getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().info("Close"); // TODO send email (before or after persisting) return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer)); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.isClosed(), getSelf()); return Effect().none(); }).build(); } @Override public EventHandler<AuctionEvent, AuctionState> eventHandler() { return eventHandlerBuilder(AuctionEvent.class) .matchEvent(BidRegistered.class, (state, event) -> { if (AuctionState.isHigherBid(event.bid, state.highestBid)) { return state.withNewHighestBid(event.bid); } else { return state.withTooLowBid(event.bid); } }).matchEvent(AuctionFinished.class, (state, event) -> { if (state.isClosed()) return state; // already closed else return state.addFinishedAtDc(event.atDc); }).matchEvent(WinnerDecided.class, (state, event) -> { return state.close(); }) .build(); } private boolean shouldClose(AuctionState state) { return auctionSetup.responsibleForClosing && !state.isClosed() && getAllDcs().equals(state.finishedAtDc); } private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) { if (shouldClose(state)) ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf()); } @Override public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx, AuctionState state, AuctionEvent event) { if (event instanceof AuctionFinished && !state.isClosed()) { AuctionFinished finished = (AuctionFinished) event; log().info("AuctionFinished at {}, already finished at [{}]", finished.atDc, state.finishedAtDc); ActorContext actorCtx = ctx.actorContext(); if (state.finishedAtDc.contains(getSelfDc())) { triggerCloseIfNeeded(actorCtx, state); } else { log().info("Sending finish to self"); actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf()); } } return Effect().none(); } @Override public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) { triggerCloseIfNeeded(ctx, state); long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis(); ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE, Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS)); return Effect().none(); } } static Props auctionProps(String pid, AuctionSetup auctionSetup, PersistenceMultiDcSettings settings) { return ReplicatedEntity.props( AuctionCommand.class, "auction", pid, () -> new AuctionEntity(auctionSetup), settings); } static private class TestSetup extends TestKit { final String testName; final int minimumBid = 12; final AuctionSetup auctionSetupA; final AuctionSetup auctionSetupB; final ActorRef nodeA; final ActorRef nodeB; TestSetup(String testName) { super(system); this.testName = testName; auctionSetupA = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(), ""), Instant.now().plusSeconds(60), true); auctionSetupB = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(), ""), Instant.now().plusSeconds(60), false); PersistenceMultiDcSettings settings = PersistenceMultiDcSettings.create(system); PersistenceMultiDcSettings otherSettings = PersistenceMultiDcSettings.create(otherSystem); nodeA = system.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupA, settings), "auction-A-" + testName); nodeB = otherSystem.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupB, otherSettings), "auction-B-" + testName); } Bid expectHighestBid(ActorRef node) { node.tell(GetHighestBid.INSTANCE, getTestActor()); return expectMsgClass(Bid.class); } void expectHighestBid(ActorRef node, String bidder, int expected) { Bid bid = expectHighestBid(node); assertEquals(expected, bid.offer); assertEquals(bidder, bid.bidder); } void expectClosed(ActorRef node) { node.tell(IsClosed.INSTANCE, getTestActor()); expectMsg(true); } } private static Config clusterConfig = ConfigFactory.parseString(String.join("\n", "akka.actor.provider = \"cluster\"", "akka.remote.netty.tcp.port = 0", "akka.remote.classic.netty.tcp.port = 0", "akka.remote.artery.canonical.port = 0", "akka.remote.artery.canonical.hostname = 127.0.0.1", "akka.cluster.jmx.multi-mbeans-in-same-jvm = on", // avoid eager init in tests "akka.persistence.multi-data-center.hot-standby.enabled = off", // FIXME when using Akka 2.6 we should use Jackson or JavaSerializable "akka.actor.allow-java-serialization = on", "akka.actor.warn-about-java-serializer-usage = off", // speed up joining and such "akka.cluster.gossip-interval = 500 ms")); private static ActorSystem createSystem(String dc) { return ActorSystem.create( "auctionTest", clusterConfig .withFallback(CassandraLifecycle.config()) // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B")))); } private static ActorSystem system; private static ActorSystem otherSystem; @BeforeClass public static void setupSystems() { system = createSystem("DC-A"); CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource()); CassandraLifecycle.awaitPersistenceInit(system); otherSystem = createSystem("DC-B"); CassandraLifecycle.awaitPersistenceInit(otherSystem); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); TestKit.shutdownActorSystem(otherSystem, true); CassandraLifecycle.stopCassandra(); } @Test public void propagateHighestBidToReplicatedActor() { new TestSetup("test-1") { { // simple bidding nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", minimumBid); // ebay style, still the minimum offer nodeA.tell(new OfferBid("Paul", 41), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 41); // ebay style, now updated to the highest counter offer awaitAssert( () -> { expectHighestBid(nodeB, "Mary", 41); return null; }); awaitAssert( () -> { // check that results have propagated to b expectHighestBid(nodeB, "Mary", 41); // ebay style, now updated to the highest counter offer return null; } ); // make sure that first bidder still keeps the crown nodeB.tell(new OfferBid("c", 42), ActorRef.noSender()); expectHighestBid(nodeB, "Mary", 42); } }; } @Test public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestButDifferentInEachDc() { new TestSetup("test-2") { { // highest bid comes first nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 41), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Mary", 41); expectHighestBid(nodeB, "Mary", 41); return null; }); // highest bid comes second nodeA.tell(new OfferBid("Paul", 50), ActorRef.noSender()); nodeB.tell(new OfferBid("Kat", 60), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Kat", 50); expectHighestBid(nodeB, "Kat", 50); return null; }); } }; } @Test public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestAndEqualButDifferentTimeInEachDc() { new TestSetup("test3") { { nodeA.tell(new OfferBid("Mary", 15), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); nodeB.tell(new OfferBid("Paul", 15), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Mary", 15); expectHighestBid(nodeB, "Mary", 15); return null; }); } }; } @Test public void eventuallyComeToAConsistentFinalResult() throws Exception { new TestSetup("test4") { { nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); Thread.sleep(3000); // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeA.tell(Finish.INSTANCE, ActorRef.noSender()); awaitAssert(() -> { expectClosed(nodeA); expectClosed(nodeB); return null; }); expectHighestBid(nodeA, "Paul", 43); expectHighestBid(nodeB, "Paul", 43); } }; } @Test public void eventuallyComeToAConsistentFinalResultWhenFinishingIsInitiatedOnNodeB() throws Exception { new TestSetup("test5") { { nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); Thread.sleep(3000); // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeB.tell(Finish.INSTANCE, ActorRef.noSender()); awaitAssert(() -> { expectClosed(nodeA); expectClosed(nodeB); return null; }); expectHighestBid(nodeA, "Paul", 43); expectHighestBid(nodeB, "Paul", 43); } }; } @Test public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception { new TestSetup("test6") { { PersistenceMultiDcTestKit.disableReplication(otherSystem, system); nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); PersistenceMultiDcTestKit.enableReplication(otherSystem, system); awaitAssert(() -> { expectHighestBid(nodeA, "Paul", 42); return null; }); } }; } }
Multi-JVM testing
A Multi JVM Test is even more heavyweight, as it will spawn an entire new JVM for each cluster node. This does allow for even more production-like tests, however bear in mind that it is also harder to interpret any failures.
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.{ ActorSystem, PoisonPill, Props }
import akka.cluster.Cluster
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.persistence.multidc.PersistenceMultiDcSettings
import akka.persistence.multidc.scaladsl.ReplicatedEntity
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import akka.testkit.TestProbe
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.persistence.multidc.testkit.CassandraLifecycle
import akka.persistence.multidc.testkit.ContactPoints
import akka.persistence.multidc.internal.MultiNodeClusterSpec
import akka.persistence.multidc.testkit.MultiNodeSessionProvider
object ShardedReplicatedEntitySpec extends MultiNodeConfig {
final val CassPort = 50100
val dc1node1 = role("dc1-node1")
val dc1node2 = role("dc1-node2")
val dc2node3 = role("dc2-node3")
val dc2node4 = role("dc2-node4")
commonConfig(ConfigFactory.parseString(
s"""
akka.loglevel = DEBUG
akka.actor.provider = cluster
akka.testconductor.barrier-timeout = 60 s
cassandra-journal-multi-dc.keyspace = ShardedReplicatedEntitySpec
cassandra-snapshot-store.keyspace = ShardedReplicatedEntitySpecSnapshot
cassandra-journal-multi-dc.port = $CassPort
cassandra-snapshot-store.port = $CassPort
cassandra-journal-multi-dc.session-provider = ${classOf[MultiNodeSessionProvider].getName}
cassandra-snapshot-store.session-provider = ${classOf[MultiNodeSessionProvider].getName}
# avoid eager init in tests
akka.persistence.multi-data-center.hot-standby.enabled = off
# FIXME when using Akka 2.6 we should use Jackson or JavaSerializable
akka.actor.allow-java-serialization = on
akka.actor.warn-about-java-serializer-usage = off
# speed up joining and such
akka.cluster.gossip-interval = 500 ms
""").withFallback(CassandraLifecycle.config))
nodeConfig(dc1node1, dc1node2) {
ConfigFactory.parseString(
"""
|akka.cluster.multi-data-center.self-data-center = DC-1
|akka.persistence.multi-data-center.all-data-centers = [DC-1, DC-2]
""".stripMargin)
}
nodeConfig(dc2node3, dc2node4) {
ConfigFactory.parseString(
"""
|akka.cluster.multi-data-center.self-data-center = DC-2
|akka.persistence.multi-data-center.all-data-centers = [DC-1, DC-2]
""".stripMargin)
}
case class CommandEnvelope(id: Long, command: Command)
sealed trait Command
case class UsageReport(usage: Long, timestamp: Long) extends Command
case object GetUsage extends Command
case object Stop extends Command
sealed trait Event
case class UsageReported(usage: Long, timestamp: Long) extends Event
case class State(totalUsage: Long, timestamp: Long)
def serverProps()(implicit system: ActorSystem): Props =
ReplicatedEntity.clusterShardingProps("server", () => new Server, PersistenceMultiDcSettings(system))
class Server() extends ReplicatedEntity[Command, Event, State] {
override def initialState: State = State(0, -1)
override def commandHandler = CommandHandler {
case (ctx, _, UsageReport(usage, ts)) => Effect.persist(UsageReported(usage, ts))
case (ctx, state, GetUsage) =>
ctx.sender() ! state
Effect.none
case (ctx, _, Stop) =>
ctx.self ! PoisonPill
Effect.none
}
override def eventHandler(currentState: State, event: Event): State = event match {
case UsageReported(usage, ts) => State(currentState.totalUsage + usage, ts)
}
}
}
class ShardedReplicatedEntitySpecMultiJvmNode1 extends ShardedReplicatedEntitySpec
class ShardedReplicatedEntitySpecMultiJvmNode2 extends ShardedReplicatedEntitySpec
class ShardedReplicatedEntitySpecMultiJvmNode3 extends ShardedReplicatedEntitySpec
class ShardedReplicatedEntitySpecMultiJvmNode4 extends ShardedReplicatedEntitySpec
abstract class ShardedReplicatedEntitySpec extends MultiNodeSpec(ShardedReplicatedEntitySpec)
with MultiNodeClusterSpec with WordSpecLike with Matchers with ScalaFutures with BeforeAndAfterAll {
import ShardedReplicatedEntitySpec._
import CassandraLifecycle._
val extractEntityId: ShardRegion.ExtractEntityId = {
case CommandEnvelope(id, command) => (id.toString, command)
}
val numberOfShards = 100
val extractShardId: ShardRegion.ExtractShardId = {
case CommandEnvelope(id, _) => (id % numberOfShards).toString
case ShardRegion.StartEntity(id) =>
// StartEntity is used by remembering entities feature
(id.toLong % numberOfShards).toString
}
override def beforeAll() = {
val CassHost = node(dc1node1).address.host.get
ContactPoints(system).completeAddressDefaultClusterId(CassHost)
runOn(dc1node1) {
startCassandra(CassHost, CassPort, system.name)
awaitPersistenceInit(system)
}
enterBarrier("cassandra-init")
runOn(dc1node2, dc2node3, dc2node4) {
awaitPersistenceInit(system)
}
enterBarrier("persistence-init")
}
"sharded replicated entity" should {
"replicate events" in {
runOn(dc1node1) {
Cluster(system).join(node(dc1node1).address)
}
runOn(dc2node3) {
Cluster(system).join(node(dc2node3).address)
}
enterBarrier("cluster-init-1")
runOn(dc1node2) {
Cluster(system).join(node(dc1node1).address)
}
runOn(dc2node4) {
Cluster(system).join(node(dc2node3).address)
}
enterBarrier("cluster-init-2")
val region = ClusterSharding(system).start(
typeName = "Server",
entityProps = serverProps(),
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
// this will be sent once on all nodes
region ! CommandEnvelope(1, UsageReport(1, 1))
within(10.seconds) {
val probe = TestProbe()
awaitAssert {
region.tell(CommandEnvelope(1, GetUsage), probe.ref)
probe.expectMsg(3.second, State(4, 1))
}
}
enterBarrier("first-message-done")
runOn(dc1node1) {
// stop replicated entity on the DC-A
region ! CommandEnvelope(1, Stop)
}
runOn(dc2node3) {
// send commands to replicated entity on DC-B
region ! CommandEnvelope(1, UsageReport(1, 2))
region ! CommandEnvelope(1, UsageReport(1, 3))
region ! CommandEnvelope(1, UsageReport(1, 4))
within(10.seconds) {
val probe = TestProbe()
awaitAssert {
region.tell(CommandEnvelope(1, GetUsage), probe.ref)
probe.expectMsg(3.second, State(7, 4))
}
}
}
enterBarrier("commands-sent-single-dc")
runOn(dc1node1) {
// bring back replicated entity on the DC-A
within(10.seconds) {
val probe = TestProbe()
awaitAssert {
region.tell(CommandEnvelope(1, GetUsage), probe.ref)
probe.expectMsg(3.second, State(7, 4))
}
}
}
enterBarrier("finish")
}
}
}