Testing
The akka-persistence-multi-dc-testkit
module provides a Simulator
for unit testing replicated entities without using a database. Fine-grained control is available for inspecting side effects and simulating various scenario’s and replication orderings.
For more realistic but more heavy ‘integration test’-style tests you might want to consider writing a Multi JVM Test in addition to simulator-based tests.
Dependency
To use the simulator and the Cassandra launcher described below you will need the akka-persistence-multi-dc-testkit
dependency:
- sbt
-
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html "com.lightbend.akka" %% "akka-persistence-multi-dc-testkit" % "1.1.16" % Test
- Gradle
- Maven
Before you can access this library, you’ll need to configure the Lightbend repository and credentials in your build.
Basic simulator usage
A simple use of the simulator might look like this:
- Scala
- Java
-
Simulator<Commands.Command, Events.Event, String> simulator = createSimulator(() -> new BuggyEntity()); simulator.runCommand(new Commands.Append("Hello")); simulator.replicatedEvent(new Events.Appended(" World")); simulator.runCommand(new Commands.Append("foo"), new Commands.Append("bar")); assertEquals("Hello Worldfoobar", simulator.state()); assertEquals(4, simulator.getEvents().size());
The createSimulator
helper method is a convenience method that makes it easy to create a Simulator
under a mostly-empty ActorSystem
and makes sure its lifecycle is correctly tied into your test framework of choice. For example, under junit the complete test might look like:
- Scala
- Java
-
import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import static org.junit.Assert.assertEquals; public class BasicTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testBasicSimulator() { Simulator<Commands.Command, Events.Event, String> simulator = createSimulator(() -> new BuggyEntity()); simulator.runCommand(new Commands.Append("Hello")); simulator.replicatedEvent(new Events.Appended(" World")); simulator.runCommand(new Commands.Append("foo"), new Commands.Append("bar")); assertEquals("Hello Worldfoobar", simulator.state()); assertEquals(4, simulator.getEvents().size()); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
For the sake of completeness, the entity under test looks like:
- Scala
- Java
-
import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; /** * This entity is buggy because replica's will end up in different states when events are persisted * concurrently in different data centres. */ public class BuggyEntity extends ReplicatedEntity<Commands.Command, Events.Event, String> { @Override public String initialState() { return ""; } @Override public CommandHandler<Commands.Command, Events.Event, String> commandHandler() { return commandHandlerBuilder(Commands.Command.class) .matchCommand(Commands.Get.class, (ctx, state, get) -> { ctx.getSender().tell(state, ctx.getSelf()); return Effect().none(); }) .matchCommand(Commands.Append.class, (ctx, state, append) -> { return Effect().persist(new Events.Appended(append.string)); }) .build(); } @Override public EventHandler<Events.Event, String> eventHandler() { return eventHandlerBuilder(Events.Event.class) .matchEvent(Events.Appended.class, (state, appended) -> { return state + appended.string; }) .build(); } }
Testing side effects
To demonstrate some more advanced testing scenario’s, we introduce a ReliableDeliverer
actor implementation which we will test. This actor implements an “at-least-once” delivery: after accepting a task and persisting the intent of performing some async action, we immediately send a DeliveryScheduled
acknowledgement to the sender. Then we attempt to perform some async call, retrying in case of restarts, sending a final DeliveryAcknowledged
at a best-effort basis. Typically you might also want to trigger retries with a Timer
, but this has been left out of this example for brevity.
The actor under test looks like this:
- Scala
- Java
-
package akka.persistence.multidc.testkit.reliabledeliverer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import akka.Done; import akka.actor.ActorRef; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.Effect; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; import static akka.pattern.PatternsCS.pipe; public class ReliableDeliverer extends ReplicatedEntity<Commands.Command, Events.Event, List<String>> { private boolean useThisDcDuringRecovery; // Transient mutable state (not part of the state managed by event sourcing) private Map<String, ActorRef> acknowledgementsTo = new HashMap<>(); public ReliableDeliverer(boolean useThisDcDuringRecovery) { this.useThisDcDuringRecovery = useThisDcDuringRecovery; } @Override public List<String> initialState() { return new ArrayList<>(); } @Override public CommandHandler<Commands.Command, Events.Event, List<String>> commandHandler() { return commandHandlerBuilder(Commands.Command.class) .matchCommand(Commands.Deliver.class, (ctx, state, deliver) -> { acknowledgementsTo.put(deliver.payload, ctx.getSender()); return Effect().persist(new Events.Promised(deliver.payload)) .andThen(s -> pipe(performAsyncCall(deliver.payload), ctx.getDispatcher()).to(ctx.getSelf())) .andThen(s -> ctx.getSender().tell(Messages.DeliveryScheduled.getInstance(), ctx.getSelf())); }) .matchCommand(Commands.ActionPerformed.class, (ctx, state, actionPerformed) -> { return Effect().persist(new Events.Acknowledged(actionPerformed.payload)) .andThen(s -> { ActorRef sender = acknowledgementsTo.get(actionPerformed.payload); if (sender != null) { sender.tell(Messages.DeliveryAcknowledged.getInstance(), ActorRef.noSender()); acknowledgementsTo.remove(actionPerformed.payload); } }); }) .build(); } @Override public EventHandler<Events.Event, List<String>> eventHandler() { return eventHandlerBuilder(Events.Event.class) .matchEvent(Events.Promised.class, (state, event) -> { List<String> newState = new ArrayList<>(state); newState.add(event.payload); return newState; }) .matchEvent(Events.Acknowledged.class, (state, event) -> { List<String> newState = new ArrayList<>(state); newState.remove(event.payload); return newState; }) .build(); } @Override public Effect<Events.Event, List<String>> recoveryCompleted(ActorContext ctx, List<String> state) { if (useThisDcDuringRecovery) { for (String payload : state) { pipe(performAsyncCall(payload), ctx.getDispatcher()).to(ctx.getSelf()); } } return Effect().none(); } CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) { throw new UnsupportedOperationException(); } }
Testing that it indeed sends both a DeliveryScheduled
and a DeliveryAcknowledged
back to the sender for the “happy path” scenario can be demonstrated with the following test:
- Scala
- Java
-
package akka.persistence.multidc.testkit.reliabledeliverer; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import akka.Done; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.persistence.multidc.testkit.reliabledeliverer.ReliableDeliverer; import akka.persistence.multidc.testkit.reliabledeliverer.Commands; import akka.persistence.multidc.testkit.reliabledeliverer.Events; import akka.persistence.multidc.testkit.reliabledeliverer.Messages; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import static org.junit.Assert.assertTrue; public class SenderTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testObserveResponseToSender() { Simulator<Commands.Command, Events.Event, List<String>> simulator = createSimulator(() -> new ReliableDeliverer(true) { @Override CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) { return CompletableFuture.completedFuture(new Commands.ActionPerformed(payload)); } }); simulator.runCommand(new Commands.Deliver("Some Payload")); assertTrue(simulator.getResponses().contains(Messages.DeliveryScheduled.getInstance())); assertTrue(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload"))); simulator.runCommand(new Commands.ActionPerformed("Some Payload")); assertTrue(simulator.getResponses().contains(Messages.DeliveryAcknowledged.getInstance())); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Testing recovery
Of course it is important to test the logic of your replicated entity is also consistent after recovery. The simulator provides a convenient .restart()
method to start a new instance of the replicated entity and apply the recovery logic. Do note that this does not actually restart the actor system used for hosting the tests.
To demonstrate testing recovery, we can for example show the recovery behavior of the ReliableDeliverer
actor introduced in the previous section:
- Scala
- Java
-
package akka.persistence.multidc.testkit.reliabledeliverer; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.persistence.multidc.testkit.reliabledeliverer.ReliableDeliverer; import akka.persistence.multidc.testkit.reliabledeliverer.Commands; import akka.persistence.multidc.testkit.reliabledeliverer.Events; import akka.persistence.multidc.testkit.reliabledeliverer.Messages; import akka.testkit.CallingThreadDispatcher; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; public class RecoveryTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor, String dispatcherId) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.of(dispatcherId)); } @Test public void testObserveResponseToSender() { final Holder<CompletableFuture<Commands.ActionPerformed>> futureToReturn = new Holder<>(new CompletableFuture<>()); Simulator<Commands.Command, Events.Event, List<String>> simulator = createSimulator(() -> new ReliableDeliverer(true) { @Override CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) { System.out.println("Returning " + futureToReturn.value); return futureToReturn.value; } }, CallingThreadDispatcher.Id()); simulator.runCommand(new Commands.Deliver("Some Payload")); assertTrue(simulator.getResponses().contains(Messages.DeliveryScheduled.getInstance())); assertFalse(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload"))); futureToReturn.value = CompletableFuture.completedFuture(new Commands.ActionPerformed("Some Payload")); simulator.restart(); assertTrue(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload"))); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Snapshotting
When snapshots are used to make recovery more efficient:
- Scala
- Java
-
package akka.persistence.multidc.testkit.snapshotting; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; public class Snapshotting extends ReplicatedEntity<Commands.Command, Events.Appended, String> { // Sneaky unmanaged state: private Integer eventsHandled = 0; @Override public String initialState() { return ""; } @Override public CommandHandler<Commands.Command, Events.Appended, String> commandHandler() { return commandHandlerBuilder(Commands.Command.class) .matchCommand(Commands.Get.class, (ctx, state, get) -> { ctx.getSender().tell(state, ctx.getSelf()); return Effect().none(); }) .matchCommand(Commands.EventsHandled.class, (ctx, state, eventsHandled) -> { ctx.getSender().tell(this.eventsHandled, ctx.getSelf()); return Effect().none(); }) .matchCommand(Commands.Append.class, (ctx, state, append) -> { return Effect().persist(new Events.Appended(append.string)); }) .build(); } @Override public EventHandler<Events.Appended, String> eventHandler() { return eventHandlerBuilder(Events.Appended.class) .matchEvent(Events.Appended.class, (state, appended) -> { eventsHandled++; return state + appended.string; }) .build(); } }
The simulator correctly takes this into account and replays the events starting from the point of the snapshot:
- Scala
- Java
-
package akka.persistence.multidc.testkit.snapshotting; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import static org.junit.Assert.assertTrue; public class SnapshottingTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testSnapshotting() { Simulator<Commands.Command, Events.Appended, String> simulator = createSimulator(() -> new Snapshotting()); simulator.runCommand(new Commands.Append("First")); simulator.takeSnapshot(); simulator.runCommand(new Commands.Append("Second")); simulator.runCommand(new Commands.Get()); assertTrue(simulator.responses().contains("FirstSecond")); simulator.runCommand(new Commands.EventsHandled()); assertTrue(simulator.responses().contains(2)); simulator.clearResponses(); simulator.restart(); simulator.runCommand(new Commands.Get()); assertTrue(simulator.responses().contains("FirstSecond")); simulator.runCommand(new Commands.EventsHandled()); assertTrue(simulator.responses().contains(1)); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Timeouts and Timers
In the following example we have a test that shuts itself down when its receive timeout has been triggered:
- Scala
- Java
-
package akka.persistence.multidc.testkit.stopwhenidle; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; public class StopWhenIdle extends ReplicatedEntity<Void, Void, Void> { @Override public Void initialState() { return null; } @Override public CommandHandler<Void, Void, Void> commandHandler() { return commandHandlerBuilder(Void.class) .onReceiveTimeout((ctx, state, rt) -> Effect().stop()) .build(); } @Override public EventHandler<Void, Void> eventHandler() { return null; } }
In its test, a receive timeout can be explicitly triggered:
- Scala
- Java
-
package akka.persistence.multidc.testkit.stopwhenidle; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.Simulator; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import static org.junit.Assert.assertTrue; public class ReceiveTimeoutTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testReceiveTimeout() { Simulator simulator = createSimulator(() -> new StopWhenIdle()); simulator.triggerReceiveTimeout(); assertTrue(simulator.stopped()); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
When a replicated entity uses the powerful Timers API:
- Scala
- Java
-
package akka.persistence.multidc.testkit.timers; import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import akka.persistence.multidc.javadsl.CommandHandler; import akka.persistence.multidc.javadsl.EventHandler; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.timers.Tick; import akka.persistence.multidc.testkit.timers.FirstTick; public class Ticking extends ReplicatedEntity<FirstTick, Integer, Integer> { public static final Object TICK_KEY = "TickKey"; @Override public Integer initialState() { return 0; } @Override public CommandHandler<FirstTick, Integer, Integer> commandHandler() { return commandHandlerBuilder(FirstTick.class) .matchCommand(FirstTick.class, (ctx, state, firstTick) -> { // FIXME Java API for setting timers, not taking a FiniteDuration ctx.getTimers().startPeriodicTimer(TICK_KEY, Tick.getInstance(), Duration.create(1, TimeUnit.SECONDS)); return Effect().none(); }) .onTimer(Tick.class, (ctx, state, tick) -> { return Effect().persist(1); }) .build(); } @Override public EventHandler<Integer, Integer> eventHandler() { return eventHandlerBuilder(Integer.class) .matchAny((state, event) -> state + event); } }
The simulator API allows timers to be inspected and triggered:
- Scala
- Java
-
package akka.persistence.multidc.testkit.timers; import java.util.Arrays; import java.util.HashSet; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; import akka.actor.ActorSystem; import akka.persistence.multidc.javadsl.ReplicatedEntity; import akka.persistence.multidc.testkit.timers.Tick; import akka.persistence.multidc.testkit.timers.FirstTick; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.Simulator; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; public class TimerTest extends JUnitSuite { private static ActorSystem system; @BeforeClass public static void startActorSystem() { system = ActorSystem.create("test"); } private String selfDc = "DC-A"; private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B")); public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) { return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty()); } @Test public void testTimers() { Simulator<FirstTick, Integer, Integer> simulator = createSimulator(() -> new Ticking()); simulator.runCommand(FirstTick.getInstance()); assertTrue(simulator.isTimerActive(Ticking.TICK_KEY)); simulator.triggerTimer(Ticking.TICK_KEY); assertTrue(simulator.isTimerActive(Ticking.TICK_KEY)); assertEquals(Integer.valueOf(1), simulator.state()); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); } }
Integration testing
It is also possible to test your replicated entity using an integration test. Such a test starts a number of actor systems, and provides some tools to simulate failures. These would typically connect to a real cassandra instance, making them more faithful but also rather heavy.
To make it easier to spawn a temporary Cassandra instance for testing akka-persistence-multi-dc-testkit
provides a CassandraLifecycle
utility. This can be relatively easily hooked into your test framework, for example for JUnit you could initialize it like this:
- Scala
- Java
-
import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.CassandraLifecycle; import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; public class AuctionExampleTest extends JUnitSuite { private static ActorSystem system; private static ActorSystem otherSystem; @BeforeClass public static void setupSystems() { system = createSystem("DC-A"); CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource()); CassandraLifecycle.awaitPersistenceInit(system); otherSystem = createSystem("DC-B"); CassandraLifecycle.awaitPersistenceInit(otherSystem); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); TestKit.shutdownActorSystem(otherSystem, true); CassandraLifecycle.stopCassandra(); } }
Combined with PersistenceMultiDcTestKit
you can write tests that verify the behavior when replication between systems is temporarily suspended. You will need to add persistenceMultiDcTestSettings
to your actor system configuration.
- Scala
- Java
-
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B")))); @Test public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception { new TestSetup("test6") { { PersistenceMultiDcTestKit.disableReplication(otherSystem, system); nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); PersistenceMultiDcTestKit.enableReplication(otherSystem, system); awaitAssert(() -> { expectHighestBid(nodeA, "Paul", 42); return null; }); } }; }
A full example, testing the auction actor described here, might look like this:
- Scala
- Java
-
import java.io.Serializable; import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.JAPI; import akka.persistence.cassandra.testkit.CassandraLauncher; import akka.persistence.multidc.PersistenceMultiDcSettings; import akka.testkit.javadsl.TestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.CassandraLifecycle; import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; import scala.concurrent.duration.Duration; import static org.junit.Assert.assertEquals; public class AuctionExampleTest extends JUnitSuite { static class Bid implements Serializable { final String bidder; final int offer; final Instant timestamp; final String originDc; Bid(String bidder, int offer, Instant timestamp, String originDc) { this.bidder = bidder; this.offer = offer; this.timestamp = timestamp; this.originDc = originDc; } Bid withOffer(int offer) { return new Bid(bidder, offer, timestamp, originDc); } } // commands interface AuctionCommand extends Serializable {} static class OfferBid implements AuctionCommand { final String bidder; final int offer; public OfferBid(String bidder, int offer) { this.bidder = bidder; this.offer = offer; } } // An auction coordinator needs to schedule this event to each replica static class Finish implements AuctionCommand { static final Finish INSTANCE = new Finish(); private Finish() {} } static class GetHighestBid implements AuctionCommand { static final GetHighestBid INSTANCE = new GetHighestBid(); private GetHighestBid() {} } static class IsClosed implements AuctionCommand { static final IsClosed INSTANCE = new IsClosed(); private IsClosed() {} } // Internal, should not be sent from the outside private static class Close implements AuctionCommand { static final Close INSTANCE = new Close(); private Close() {} } // events interface AuctionEvent extends Serializable {} static class BidRegistered implements AuctionEvent { final Bid bid; public BidRegistered(Bid bid) { this.bid = bid; } } static class AuctionFinished implements AuctionEvent { final String atDc; public AuctionFinished(String atDc) { this.atDc = atDc; } } static class WinnerDecided implements AuctionEvent { final String atDc; final Bid winningBid; final int highestCounterOffer; public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) { this.atDc = atDc; this.winningBid = winningBid; this.highestCounterOffer = highestCounterOffer; } } static class AuctionState { final boolean stillRunning; final Bid highestBid; // in ebay style auctions, we need to keep track of current highest counter offer final int highestCounterOffer; final Set<String> finishedAtDc; AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) { this.stillRunning = stillRunning; this.highestBid = highestBid; this.highestCounterOffer = highestCounterOffer; this.finishedAtDc = finishedAtDc; } AuctionState withNewHighestBid(Bid bid) { assert(stillRunning); assert(isHigherBid(bid, highestBid)); return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around } AuctionState withTooLowBid(Bid bid) { assert(stillRunning); assert(isHigherBid(highestBid, bid)); return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc); } static Boolean isHigherBid(Bid first, Bid second) { return first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0); } AuctionState addFinishedAtDc(String dc) { Set<String> s = new HashSet<>(finishedAtDc); s.add(dc); return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s)); } public AuctionState close() { return new AuctionState( false, highestBid, highestCounterOffer, Collections.emptySet() ); } public boolean isClosed() { return !stillRunning && finishedAtDc.isEmpty(); } } private static final String FINISH_TIMER = "FinishTimer"; static class AuctionSetup { final String name; final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner final Instant closingAt; final boolean responsibleForClosing; public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) { this.name = name; this.initialBid = initialBid; this.closingAt = closingAt; this.responsibleForClosing = responsibleForClosing; } } static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> { final AuctionSetup auctionSetup; public AuctionEntity(AuctionSetup auctionSetup) { this.auctionSetup = auctionSetup; } @Override public AuctionState initialState() { return new AuctionState( true, auctionSetup.initialBid, auctionSetup.initialBid.offer, Collections.emptySet()); } @Override public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() { return byStateCommandHandlerBuilder(AuctionState.class) .matchState(state -> state.stillRunning, running()) .matchAny(finished()); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc()))); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().warning("Premature close"); // Close should only be triggered when we have already finished return Effect().unhandled(); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(false, getSelf()); return Effect().none(); }).build(); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { // auction finished, no more bids accepted return Effect().unhandled(); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid, getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().info("Close"); // TODO send email (before or after persisting) return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer)); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.isClosed(), getSelf()); return Effect().none(); }).build(); } @Override public EventHandler<AuctionEvent, AuctionState> eventHandler() { return eventHandlerBuilder(AuctionEvent.class) .matchEvent(BidRegistered.class, (state, event) -> { if (AuctionState.isHigherBid(event.bid, state.highestBid)) { return state.withNewHighestBid(event.bid); } else { return state.withTooLowBid(event.bid); } }).matchEvent(AuctionFinished.class, (state, event) -> { if (state.isClosed()) return state; // already closed else return state.addFinishedAtDc(event.atDc); }).matchEvent(WinnerDecided.class, (state, event) -> { return state.close(); }) .build(); } private boolean shouldClose(AuctionState state) { return auctionSetup.responsibleForClosing && !state.isClosed() && getAllDcs().equals(state.finishedAtDc); } private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) { if (shouldClose(state)) ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf()); } @Override public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx, AuctionState state, AuctionEvent event) { if (event instanceof AuctionFinished && !state.isClosed()) { AuctionFinished finished = (AuctionFinished) event; log().info("AuctionFinished at {}, already finished at [{}]", finished.atDc, state.finishedAtDc); ActorContext actorCtx = ctx.actorContext(); if (state.finishedAtDc.contains(getSelfDc())) { triggerCloseIfNeeded(actorCtx, state); } else { log().info("Sending finish to self"); actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf()); } } return Effect().none(); } @Override public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) { triggerCloseIfNeeded(ctx, state); long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis(); ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE, Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS)); return Effect().none(); } } static Props auctionProps(String pid, AuctionSetup auctionSetup, PersistenceMultiDcSettings settings) { return ReplicatedEntity.props( AuctionCommand.class, "auction", pid, () -> new AuctionEntity(auctionSetup), settings); } static private class TestSetup extends TestKit { final String testName; final int minimumBid = 12; final AuctionSetup auctionSetupA; final AuctionSetup auctionSetupB; final ActorRef nodeA; final ActorRef nodeB; TestSetup(String testName) { super(system); this.testName = testName; auctionSetupA = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(), ""), Instant.now().plusSeconds(60), true); auctionSetupB = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(), ""), Instant.now().plusSeconds(60), false); PersistenceMultiDcSettings settings = PersistenceMultiDcSettings.create(system); PersistenceMultiDcSettings otherSettings = PersistenceMultiDcSettings.create(otherSystem); nodeA = system.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupA, settings), "auction-A-" + testName); nodeB = otherSystem.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupB, otherSettings), "auction-B-" + testName); } Bid expectHighestBid(ActorRef node) { node.tell(GetHighestBid.INSTANCE, getTestActor()); return expectMsgClass(Bid.class); } void expectHighestBid(ActorRef node, String bidder, int expected) { Bid bid = expectHighestBid(node); assertEquals(expected, bid.offer); assertEquals(bidder, bid.bidder); } void expectClosed(ActorRef node) { node.tell(IsClosed.INSTANCE, getTestActor()); expectMsg(true); } } private static Config clusterConfig = ConfigFactory.parseString(String.join("\n", "akka.actor.provider = \"cluster\"", "akka.remote.netty.tcp.port = 0", "akka.remote.classic.netty.tcp.port = 0", "akka.remote.artery.canonical.port = 0", "akka.remote.artery.canonical.hostname = 127.0.0.1", "akka.cluster.jmx.multi-mbeans-in-same-jvm = on", // avoid eager init in tests "akka.persistence.multi-data-center.hot-standby.enabled = off", // FIXME when using Akka 2.6 we should use Jackson or JavaSerializable "akka.actor.allow-java-serialization = on", "akka.actor.warn-about-java-serializer-usage = off", // speed up joining and such "akka.cluster.gossip-interval = 500 ms")); private static ActorSystem createSystem(String dc) { return ActorSystem.create( "auctionTest", clusterConfig .withFallback(CassandraLifecycle.config()) // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B")))); } private static ActorSystem system; private static ActorSystem otherSystem; @BeforeClass public static void setupSystems() { system = createSystem("DC-A"); CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource()); CassandraLifecycle.awaitPersistenceInit(system); otherSystem = createSystem("DC-B"); CassandraLifecycle.awaitPersistenceInit(otherSystem); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); TestKit.shutdownActorSystem(otherSystem, true); CassandraLifecycle.stopCassandra(); } @Test public void propagateHighestBidToReplicatedActor() { new TestSetup("test-1") { { // simple bidding nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", minimumBid); // ebay style, still the minimum offer nodeA.tell(new OfferBid("Paul", 41), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 41); // ebay style, now updated to the highest counter offer awaitAssert( () -> { expectHighestBid(nodeB, "Mary", 41); return null; }); awaitAssert( () -> { // check that results have propagated to b expectHighestBid(nodeB, "Mary", 41); // ebay style, now updated to the highest counter offer return null; } ); // make sure that first bidder still keeps the crown nodeB.tell(new OfferBid("c", 42), ActorRef.noSender()); expectHighestBid(nodeB, "Mary", 42); } }; } @Test public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestButDifferentInEachDc() { new TestSetup("test-2") { { // highest bid comes first nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 41), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Mary", 41); expectHighestBid(nodeB, "Mary", 41); return null; }); // highest bid comes second nodeA.tell(new OfferBid("Paul", 50), ActorRef.noSender()); nodeB.tell(new OfferBid("Kat", 60), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Kat", 50); expectHighestBid(nodeB, "Kat", 50); return null; }); } }; } @Test public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestAndEqualButDifferentTimeInEachDc() { new TestSetup("test3") { { nodeA.tell(new OfferBid("Mary", 15), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); nodeB.tell(new OfferBid("Paul", 15), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Mary", 15); expectHighestBid(nodeB, "Mary", 15); return null; }); } }; } @Test public void eventuallyComeToAConsistentFinalResult() throws Exception { new TestSetup("test4") { { nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); Thread.sleep(3000); // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeA.tell(Finish.INSTANCE, ActorRef.noSender()); awaitAssert(() -> { expectClosed(nodeA); expectClosed(nodeB); return null; }); expectHighestBid(nodeA, "Paul", 43); expectHighestBid(nodeB, "Paul", 43); } }; } @Test public void eventuallyComeToAConsistentFinalResultWhenFinishingIsInitiatedOnNodeB() throws Exception { new TestSetup("test5") { { nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); Thread.sleep(3000); // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeB.tell(Finish.INSTANCE, ActorRef.noSender()); awaitAssert(() -> { expectClosed(nodeA); expectClosed(nodeB); return null; }); expectHighestBid(nodeA, "Paul", 43); expectHighestBid(nodeB, "Paul", 43); } }; } @Test public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception { new TestSetup("test6") { { PersistenceMultiDcTestKit.disableReplication(otherSystem, system); nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); PersistenceMultiDcTestKit.enableReplication(otherSystem, system); awaitAssert(() -> { expectHighestBid(nodeA, "Paul", 42); return null; }); } }; } }