Testing

The akka-persistence-multi-dc-testkit module provides a Simulator for unit testing replicated entities without using a database. Fine-grained control is available for inspecting side effects and simulating various scenario’s and replication orderings.

For more realistic but more heavy ‘integration test’-style tests you might want to consider writing a Multi JVM Test in addition to simulator-based tests.

Dependency

To use the simulator and the Cassandra launcher described below you will need the akka-persistence-multi-dc-testkit dependency:

sbt
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html
"com.lightbend.akka" %% "akka-persistence-multi-dc-testkit" % "1.1.16" % Test
Gradle
// Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html
dependencies {
  testCompile group: 'com.lightbend.akka', name: 'akka-persistence-multi-dc-testkit_2.11', version: '1.1.16'
}
Maven
<!-- Add Lightbend Platform to your build as documented at https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-started/subscription-and-credentials.html -->
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-persistence-multi-dc-testkit_2.11</artifactId>
  <version>1.1.16</version>
  <scope>test</scope>
</dependency>

Before you can access this library, you’ll need to configure the Lightbend repository and credentials in your build.

Basic simulator usage

A simple use of the simulator might look like this:

Scala
import BuggyEntity._
val simulator = createSimulator(() => new BuggyEntity(), selfDc = allDcs.head, allDcs)

simulator.runCommand(Append("Hello"))
simulator.replicatedEvent(Appended(" World"))
simulator.runCommand(Append("foo"), Append("bar"))

simulator.state() should ===("Hello Worldfoobar")
simulator.events().length should ===(4)
Java
Simulator<Commands.Command, Events.Event, String> simulator = createSimulator(() -> new BuggyEntity());
simulator.runCommand(new Commands.Append("Hello"));
simulator.replicatedEvent(new Events.Appended(" World"));
simulator.runCommand(new Commands.Append("foo"), new Commands.Append("bar"));

assertEquals("Hello Worldfoobar", simulator.state());
assertEquals(4, simulator.getEvents().size());

The createSimulator helper method is a convenience method that makes it easy to create a Simulator under a mostly-empty ActorSystem and makes sure its lifecycle is correctly tied into your test framework of choice. For example, under scalatestjunit the complete test might look like:

Scala
import scala.concurrent.{ Future, Promise }
import akka.actor.ActorRef
import akka.persistence.multidc.scaladsl.{ Effect, EffectFactories, ReplicatedEntity }
import org.scalatest.{ Matchers, WordSpecLike }
class SimulatorSpec extends WordSpecLike with ScalatestSimulator with Matchers {
  import SimulatorSpec._

  val allDcs = Set("DC-A", "DC-B")

  "The simulator" should {
    "allow writing a test controlling replication manually" in {
      import BuggyEntity._
      val simulator = createSimulator(() => new BuggyEntity(), selfDc = allDcs.head, allDcs)

      simulator.runCommand(Append("Hello"))
      simulator.replicatedEvent(Appended(" World"))
      simulator.runCommand(Append("foo"), Append("bar"))

      simulator.state() should ===("Hello Worldfoobar")
      simulator.events().length should ===(4)
    }
  }
}
Java
import akka.actor.ActorSystem;
import akka.persistence.multidc.javadsl.ReplicatedEntity;
import akka.persistence.multidc.testkit.Simulator;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static org.junit.Assert.assertEquals;

public class BasicTest extends JUnitSuite {
  private static ActorSystem system;

  @BeforeClass
  public static void startActorSystem() {
    system = ActorSystem.create("test");
  }

  private String selfDc = "DC-A";
  private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B"));

  public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) {
    return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty());
  }

  @Test
  public void testBasicSimulator() {
    Simulator<Commands.Command, Events.Event, String> simulator = createSimulator(() -> new BuggyEntity());
    simulator.runCommand(new Commands.Append("Hello"));
    simulator.replicatedEvent(new Events.Appended(" World"));
    simulator.runCommand(new Commands.Append("foo"), new Commands.Append("bar"));

    assertEquals("Hello Worldfoobar", simulator.state());
    assertEquals(4, simulator.getEvents().size());
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
  }
}

For the sake of completeness, the entity under test looks like:

Scala
/**
 * This entity is buggy because replica's will end up in different states when events are persisted
 * concurrently in different data centres.
 */
object BuggyEntity {

  sealed trait Command
  case class Append(string: String) extends Command
  case class Get(replyTo: ActorRef) extends Command

  sealed trait Event
  case class Appended(string: String) extends Event

  type State = String
}

class BuggyEntity extends ReplicatedEntity[BuggyEntity.Command, BuggyEntity.Event, BuggyEntity.State] {
  import BuggyEntity._
  override val initialState = ""

  override def commandHandler = CommandHandler { (ctx, state, cmd) =>
    cmd match {
      case Get(replyTo) =>
        replyTo ! state
        Effect.none
      case Append(string) =>
        Effect.persist(Appended(string))
    }
  }

  override def eventHandler(state: State, event: Event): State = event match {
    case Appended(string) => state + string
  }
}
Java
import akka.persistence.multidc.javadsl.CommandHandler;
import akka.persistence.multidc.javadsl.EventHandler;
import akka.persistence.multidc.javadsl.ReplicatedEntity;

/**
 * This entity is buggy because replica's will end up in different states when events are persisted
 * concurrently in different data centres.
 */
public class BuggyEntity extends ReplicatedEntity<Commands.Command, Events.Event, String> {
  @Override
  public String initialState() {
    return "";
  }

  @Override
  public CommandHandler<Commands.Command, Events.Event, String> commandHandler() {
    return commandHandlerBuilder(Commands.Command.class)
        .matchCommand(Commands.Get.class, (ctx, state, get) -> {
          ctx.getSender().tell(state, ctx.getSelf());
          return Effect().none();
        })
        .matchCommand(Commands.Append.class, (ctx, state, append) -> {
          return Effect().persist(new Events.Appended(append.string));
        })
        .build();
  }

  @Override
  public EventHandler<Events.Event, String> eventHandler() {
    return eventHandlerBuilder(Events.Event.class)
        .matchEvent(Events.Appended.class, (state, appended) -> {
          return state + appended.string;
        })
        .build();
  }
}

Testing side effects

To demonstrate some more advanced testing scenario’s, we introduce a ReliableDeliverer actor implementation which we will test. This actor implements an “at-least-once” delivery: after accepting a task and persisting the intent of performing some async action, we immediately send a DeliveryScheduled acknowledgement to the sender. Then we attempt to perform some async call, retrying in case of restarts, sending a final DeliveryAcknowledged at a best-effort basis. Typically you might also want to trigger retries with a Timer, but this has been left out of this example for brevity.

The actor under test looks like this:

Scala
object ReliableDeliverer {
  sealed trait Command
  case class Deliver(payload: String) extends Command
  sealed abstract class InternalCommand extends Command
  case class ActionPerformed(payload: String) extends InternalCommand

  case object DeliveryScheduled
  case object DeliveryAcknowledged

  sealed trait Event
  case class Promised(payload: String) extends Event
  case class Acknowledged(payload: String) extends Event

  case class State(pendingPayloads: List[String])
}
class ReliableDeliverer(useThisDcDuringRecovery: Boolean = true) extends ReplicatedEntity[ReliableDeliverer.Command, ReliableDeliverer.Event, ReliableDeliverer.State] {
  import akka.pattern.pipe
  import ReliableDeliverer._

  // Transient mutable state (not part of the state managed by event sourcing)
  // which means it will be lost when the actor is restarted, and delivery acknowledgements
  // might go missing even though the delivery is in fact retried.
  var acknowledgementsTo: Map[String, ActorRef] = Map.empty

  override val initialState = State(List.empty)

  override def commandHandler = CommandHandler { (ctx, state, cmd) =>
    cmd match {
      case Deliver(payload) =>
        import ctx.dispatcher
        acknowledgementsTo = acknowledgementsTo.updated(payload, ctx.sender())
        Effect.persist(Promised(payload))
          .andThen(_ => performAsyncCall(payload).pipeTo(ctx.self))
          .andThen(_ => ctx.sender() ! DeliveryScheduled)
      case ActionPerformed(payload) =>
        Effect.persist(Acknowledged(payload))
          .andThen { _ =>
            acknowledgementsTo.get(payload).foreach(_ ! DeliveryAcknowledged)
            acknowledgementsTo = acknowledgementsTo - payload
          }
    }
  }

  override def eventHandler(state: State, event: Event): State = event match {
    case Promised(payload) =>
      state.copy(state.pendingPayloads :+ payload)
    case Acknowledged(payload) =>
      state.copy(state.pendingPayloads.filter(_ != payload))
  }

  override def recoveryCompleted(ctx: ActorContext, state: State): Effect[Event, State] = {
    if (useThisDcDuringRecovery) {
      import ctx.dispatcher
      state.pendingPayloads.foreach(payload => performAsyncCall(payload).pipeTo(ctx.self))
    }

    Effect.none
  }

  def performAsyncCall(payload: String): Future[ActionPerformed] = ???
}
Java
package akka.persistence.multidc.testkit.reliabledeliverer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import akka.Done;
import akka.actor.ActorRef;
import akka.persistence.multidc.javadsl.CommandHandler;
import akka.persistence.multidc.javadsl.Effect;
import akka.persistence.multidc.javadsl.EventHandler;
import akka.persistence.multidc.javadsl.ReplicatedEntity;

import static akka.pattern.PatternsCS.pipe;

public class ReliableDeliverer extends ReplicatedEntity<Commands.Command, Events.Event, List<String>> {
  private boolean useThisDcDuringRecovery;

  // Transient mutable state (not part of the state managed by event sourcing)
  private Map<String, ActorRef> acknowledgementsTo = new HashMap<>();

  public ReliableDeliverer(boolean useThisDcDuringRecovery) {
    this.useThisDcDuringRecovery = useThisDcDuringRecovery;
  }

  @Override
  public List<String> initialState() {
    return new ArrayList<>();
  }

  @Override
  public CommandHandler<Commands.Command, Events.Event, List<String>> commandHandler() {
    return commandHandlerBuilder(Commands.Command.class)
      .matchCommand(Commands.Deliver.class, (ctx, state, deliver) -> {
        acknowledgementsTo.put(deliver.payload, ctx.getSender());
        return Effect().persist(new Events.Promised(deliver.payload))
          .andThen(s -> pipe(performAsyncCall(deliver.payload), ctx.getDispatcher()).to(ctx.getSelf()))
          .andThen(s -> ctx.getSender().tell(Messages.DeliveryScheduled.getInstance(), ctx.getSelf()));
      })
      .matchCommand(Commands.ActionPerformed.class, (ctx, state, actionPerformed) -> {
        return Effect().persist(new Events.Acknowledged(actionPerformed.payload))
          .andThen(s -> {
            ActorRef sender = acknowledgementsTo.get(actionPerformed.payload);
            if (sender != null) {
              sender.tell(Messages.DeliveryAcknowledged.getInstance(), ActorRef.noSender());
              acknowledgementsTo.remove(actionPerformed.payload);
            }
          });
      })
      .build();
  }

  @Override
  public EventHandler<Events.Event, List<String>> eventHandler() {
    return eventHandlerBuilder(Events.Event.class)
      .matchEvent(Events.Promised.class, (state, event) -> {
        List<String> newState = new ArrayList<>(state);
        newState.add(event.payload);
        return newState;
      })
      .matchEvent(Events.Acknowledged.class, (state, event) -> {
        List<String> newState = new ArrayList<>(state);
        newState.remove(event.payload);
        return newState;
      })
      .build();
  }

  @Override
  public Effect<Events.Event, List<String>> recoveryCompleted(ActorContext ctx, List<String> state) {
    if (useThisDcDuringRecovery) {
      for (String payload : state) {
        pipe(performAsyncCall(payload), ctx.getDispatcher()).to(ctx.getSelf());
      }
    }

    return Effect().none();
  }

  CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) {
    throw new UnsupportedOperationException();
  }
}

Testing that it indeed sends both a DeliveryScheduled and a DeliveryAcknowledged back to the sender for the “happy path” scenario can be demonstrated with the following test:

Scala
import ReliableDeliverer._
import akka.testkit.CallingThreadDispatcher

val simulator = createSimulator(() => new ReliableDeliverer() {
  override def performAsyncCall(payload: String): Future[ActionPerformed] = Future.successful(ActionPerformed(payload))
}, selfDc = allDcs.head, allDcs, Some(CallingThreadDispatcher.Id))
simulator.runCommand(Deliver("Some Payload"))
simulator.responses shouldBe Seq(DeliveryScheduled)
simulator.selfMessages shouldBe Seq(ActionPerformed("Some Payload"))

simulator.runCommand(ActionPerformed("Some Payload"))
simulator.responses shouldBe Seq(DeliveryScheduled, DeliveryAcknowledged)
Java
package akka.persistence.multidc.testkit.reliabledeliverer;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import akka.Done;
import akka.actor.ActorSystem;
import akka.persistence.multidc.javadsl.ReplicatedEntity;
import akka.persistence.multidc.testkit.Simulator;
import akka.persistence.multidc.testkit.reliabledeliverer.ReliableDeliverer;
import akka.persistence.multidc.testkit.reliabledeliverer.Commands;
import akka.persistence.multidc.testkit.reliabledeliverer.Events;
import akka.persistence.multidc.testkit.reliabledeliverer.Messages;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import static org.junit.Assert.assertTrue;

public class SenderTest extends JUnitSuite {
  private static ActorSystem system;

  @BeforeClass
  public static void startActorSystem() {
    system = ActorSystem.create("test");
  }

  private String selfDc = "DC-A";
  private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B"));

  public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) {
    return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty());
  }

  @Test
  public void testObserveResponseToSender() {
    Simulator<Commands.Command, Events.Event, List<String>> simulator = createSimulator(() -> new ReliableDeliverer(true) {
      @Override
      CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) {
        return CompletableFuture.completedFuture(new Commands.ActionPerformed(payload));
      }
    });
    simulator.runCommand(new Commands.Deliver("Some Payload"));
    assertTrue(simulator.getResponses().contains(Messages.DeliveryScheduled.getInstance()));
    assertTrue(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload")));

    simulator.runCommand(new Commands.ActionPerformed("Some Payload"));
    assertTrue(simulator.getResponses().contains(Messages.DeliveryAcknowledged.getInstance()));
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
  }
}

Testing recovery

Of course it is important to test the logic of your replicated entity is also consistent after recovery. The simulator provides a convenient .restart() method to start a new instance of the replicated entity and apply the recovery logic. Do note that this does not actually restart the actor system used for hosting the tests.

To demonstrate testing recovery, we can for example show the recovery behavior of the ReliableDeliverer actor introduced in the previous section:

Scala
import ReliableDeliverer._

var futureToReturn = Promise[ActionPerformed]().future

val simulator = createSimulator(() => new ReliableDeliverer() {
  override def performAsyncCall(payload: String): Future[ActionPerformed] = futureToReturn
}, selfDc = allDcs.head, allDcs, Some(CallingThreadDispatcher.Id))
simulator.runCommand(Deliver("Some Payload"))
simulator.responses shouldBe Seq(DeliveryScheduled)

futureToReturn = Future.successful(ActionPerformed("Some Payload"))
simulator.restart()

simulator.selfMessages shouldBe Seq(ActionPerformed("Some Payload"))
Java
package akka.persistence.multidc.testkit.reliabledeliverer;

import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import akka.actor.ActorSystem;
import akka.persistence.multidc.javadsl.ReplicatedEntity;
import akka.persistence.multidc.testkit.Simulator;
import akka.persistence.multidc.testkit.reliabledeliverer.ReliableDeliverer;
import akka.persistence.multidc.testkit.reliabledeliverer.Commands;
import akka.persistence.multidc.testkit.reliabledeliverer.Events;
import akka.persistence.multidc.testkit.reliabledeliverer.Messages;
import akka.testkit.CallingThreadDispatcher;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;

public class RecoveryTest extends JUnitSuite {
  private static ActorSystem system;

  @BeforeClass
  public static void startActorSystem() {
    system = ActorSystem.create("test");
  }

  private String selfDc = "DC-A";
  private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B"));

  public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor, String dispatcherId) {
    return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.of(dispatcherId));
  }

  @Test
  public void testObserveResponseToSender() {
    final Holder<CompletableFuture<Commands.ActionPerformed>> futureToReturn = new Holder<>(new CompletableFuture<>());

    Simulator<Commands.Command, Events.Event, List<String>> simulator = createSimulator(() -> new ReliableDeliverer(true) {
      @Override
      CompletableFuture<Commands.ActionPerformed> performAsyncCall(String payload) {
        System.out.println("Returning " + futureToReturn.value);
        return futureToReturn.value;
      }
    }, CallingThreadDispatcher.Id());
    simulator.runCommand(new Commands.Deliver("Some Payload"));
    assertTrue(simulator.getResponses().contains(Messages.DeliveryScheduled.getInstance()));
    assertFalse(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload")));

    futureToReturn.value = CompletableFuture.completedFuture(new Commands.ActionPerformed("Some Payload"));
    simulator.restart();

    assertTrue(simulator.getSelfMessages().contains(new Commands.ActionPerformed("Some Payload")));
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
  }
}

Snapshotting

When snapshots are used to make recovery more efficient:

Scala
object Snapshotting {
  sealed trait Command
  case class Append(string: String) extends Command
  case object Get extends Command
  case object EventsHandled extends Command

  sealed trait Event
  case class Appended(string: String) extends Event

  type State = String
}
class Snapshotting extends ReplicatedEntity[Snapshotting.Command, Snapshotting.Event, Snapshotting.State] {
  import Snapshotting._

  override val initialState = ""

  // Sneaky unmanaged state:
  var eventsHandled = 0

  override def commandHandler = CommandHandler { (ctx, state, cmd) =>
    cmd match {
      case Get =>
        ctx.sender() ! state
        Effect.none
      case EventsHandled =>
        ctx.sender() ! eventsHandled
        Effect.none
      case Append(string) =>
        Effect.persist(Appended(string))
    }
  }

  override def eventHandler(state: State, event: Event): State = event match {
    case Appended(string) =>
      eventsHandled = eventsHandled + 1
      state + string
  }
}
Java
package akka.persistence.multidc.testkit.snapshotting;

import akka.persistence.multidc.javadsl.CommandHandler;
import akka.persistence.multidc.javadsl.EventHandler;
import akka.persistence.multidc.javadsl.ReplicatedEntity;

public class Snapshotting extends ReplicatedEntity<Commands.Command, Events.Appended, String> {

  // Sneaky unmanaged state:
  private Integer eventsHandled = 0;

  @Override
  public String initialState() {
    return "";
  }

  @Override
  public CommandHandler<Commands.Command, Events.Appended, String> commandHandler() {
    return commandHandlerBuilder(Commands.Command.class)
      .matchCommand(Commands.Get.class, (ctx, state, get) -> {
        ctx.getSender().tell(state, ctx.getSelf());
        return Effect().none();
      })
      .matchCommand(Commands.EventsHandled.class, (ctx, state, eventsHandled) -> {
        ctx.getSender().tell(this.eventsHandled, ctx.getSelf());
        return Effect().none();
      })
      .matchCommand(Commands.Append.class, (ctx, state, append) -> {
        return Effect().persist(new Events.Appended(append.string));
      })
      .build();
  }

  @Override
  public EventHandler<Events.Appended, String> eventHandler() {
    return eventHandlerBuilder(Events.Appended.class)
      .matchEvent(Events.Appended.class, (state, appended) -> {
        eventsHandled++;
        return state + appended.string;
      })
      .build();
  }
}

The simulator correctly takes this into account and replays the events starting from the point of the snapshot:

Scala
import Snapshotting._
val simulator = createSimulator(() => new Snapshotting(), selfDc = allDcs.head, allDcs)
simulator.runCommand(Append("First"))
simulator.takeSnapshot()
simulator.runCommand(Append("Second"))

simulator.runCommand(Get)
simulator.runCommand(EventsHandled)
simulator.responses() shouldBe Seq("FirstSecond", 2)

simulator.restart()

simulator.runCommand(Get)
simulator.runCommand(EventsHandled)
// TODO perhaps reading the responses should clear them as a side effect, like expectMessage etc
simulator.responses() shouldBe Seq("FirstSecond", 2, "FirstSecond", 1)
Java
package akka.persistence.multidc.testkit.snapshotting;

import akka.actor.ActorSystem;
import akka.persistence.multidc.javadsl.ReplicatedEntity;
import akka.persistence.multidc.testkit.Simulator;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import static org.junit.Assert.assertTrue;

public class SnapshottingTest extends JUnitSuite {
  private static ActorSystem system;

  @BeforeClass
  public static void startActorSystem() {
    system = ActorSystem.create("test");
  }

  private String selfDc = "DC-A";
  private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B"));

  public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) {
    return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty());
  }

  @Test
  public void testSnapshotting() {
    Simulator<Commands.Command, Events.Appended, String> simulator = createSimulator(() -> new Snapshotting());
    simulator.runCommand(new Commands.Append("First"));
    simulator.takeSnapshot();
    simulator.runCommand(new Commands.Append("Second"));

    simulator.runCommand(new Commands.Get());
    assertTrue(simulator.responses().contains("FirstSecond"));
    simulator.runCommand(new Commands.EventsHandled());
    assertTrue(simulator.responses().contains(2));
    simulator.clearResponses();

    simulator.restart();

    simulator.runCommand(new Commands.Get());
    assertTrue(simulator.responses().contains("FirstSecond"));
    simulator.runCommand(new Commands.EventsHandled());
    assertTrue(simulator.responses().contains(1));
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
  }
}

Timeouts and Timers

In the following example we have a test that shuts itself down when its receive timeout has been triggered:

Scala
class StopWhenIdle extends ReplicatedEntity[Unit, Unit, Unit] {
  override def initialState: Unit = ()

  override def commandHandler = CommandHandler {
    (_, _, _) => Effect.unhandled
  }.onReceiveTimeout {
    case (ctx, state) => Effect.stop
  }

  override def eventHandler(state: Unit, event: Unit): Unit = ()
}
Java
package akka.persistence.multidc.testkit.stopwhenidle;

import akka.persistence.multidc.javadsl.CommandHandler;
import akka.persistence.multidc.javadsl.EventHandler;
import akka.persistence.multidc.javadsl.ReplicatedEntity;

public class StopWhenIdle extends ReplicatedEntity<Void, Void, Void> {
  @Override
  public Void initialState() {
    return null;
  }

  @Override
  public CommandHandler<Void, Void, Void> commandHandler() {
    return commandHandlerBuilder(Void.class)
      .onReceiveTimeout((ctx, state, rt) -> Effect().stop())
      .build();
  }

  @Override
  public EventHandler<Void, Void> eventHandler() {
    return null;
  }
}

In its test, a receive timeout can be explicitly triggered:

Scala
val simulator = createSimulator(() => new StopWhenIdle(), selfDc = allDcs.head, allDcs)
simulator.triggerReceiveTimeout()
simulator.stopped() shouldBe true
Java
package akka.persistence.multidc.testkit.stopwhenidle;

import akka.actor.ActorSystem;
import akka.persistence.multidc.javadsl.ReplicatedEntity;
import akka.persistence.multidc.testkit.Simulator;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import static org.junit.Assert.assertTrue;

public class ReceiveTimeoutTest extends JUnitSuite {
  private static ActorSystem system;

  @BeforeClass
  public static void startActorSystem() {
    system = ActorSystem.create("test");
  }

  private String selfDc = "DC-A";
  private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B"));

  public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) {
    return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty());
  }

  @Test
  public void testReceiveTimeout() {
    Simulator simulator = createSimulator(() -> new StopWhenIdle());
    simulator.triggerReceiveTimeout();
    assertTrue(simulator.stopped());
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
  }
}

When a replicated entity uses the powerful Timers API:

Scala
object Ticking {
  case object TickKey

  sealed trait Command
  case object FirstTick extends Command

  case object Tick
}

class Ticking extends ReplicatedEntity[Ticking.Command, Int, Int] {
  import scala.concurrent.duration._
  import Ticking._

  override val initialState = 0

  override def commandHandler = CommandHandler { (ctx, state, cmd) =>
    cmd match {
      case FirstTick =>
        ctx.timers.startPeriodicTimer(TickKey, Tick, 1.second)
        Effect.none
    }
  }.onTimer[Tick.type] { (ctx, state, tick) =>
    tick match {
      case Tick => Effect.persist(1)
    }
  }

  override def eventHandler(state: Int, event: Int): Int = state + event
}
Java
package akka.persistence.multidc.testkit.timers;

import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.Duration;

import akka.persistence.multidc.javadsl.CommandHandler;
import akka.persistence.multidc.javadsl.EventHandler;
import akka.persistence.multidc.javadsl.ReplicatedEntity;

import akka.persistence.multidc.testkit.timers.Tick;
import akka.persistence.multidc.testkit.timers.FirstTick;

public class Ticking extends ReplicatedEntity<FirstTick, Integer, Integer> {
  public static final Object TICK_KEY = "TickKey";

  @Override
  public Integer initialState() {
    return 0;
  }

  @Override
  public CommandHandler<FirstTick, Integer, Integer> commandHandler() {
    return commandHandlerBuilder(FirstTick.class)
      .matchCommand(FirstTick.class, (ctx, state, firstTick) -> {
        // FIXME Java API for setting timers, not taking a FiniteDuration
        ctx.getTimers().startPeriodicTimer(TICK_KEY, Tick.getInstance(), Duration.create(1, TimeUnit.SECONDS));
        return Effect().none();
      })
      .onTimer(Tick.class, (ctx, state, tick) -> {
        return Effect().persist(1);
      })
      .build();
  }

  @Override
  public EventHandler<Integer, Integer> eventHandler() {
    return eventHandlerBuilder(Integer.class)
      .matchAny((state, event) -> state + event);
  }
}

The simulator API allows timers to be inspected and triggered:

Scala
import Ticking._
val simulator = createSimulator(() => new Ticking(), selfDc = allDcs.head, allDcs)
simulator.runCommand(FirstTick)

simulator.isTimerActive(TickKey) shouldBe true
simulator.triggerTimer(TickKey)

simulator.isTimerActive(TickKey) shouldBe true
simulator.state shouldBe 1
Java
package akka.persistence.multidc.testkit.timers;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;

import akka.actor.ActorSystem;

import akka.persistence.multidc.javadsl.ReplicatedEntity;
import akka.persistence.multidc.testkit.timers.Tick;
import akka.persistence.multidc.testkit.timers.FirstTick;

import akka.testkit.javadsl.TestKit;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import akka.persistence.multidc.testkit.Simulator;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;

public class TimerTest extends JUnitSuite {
  private static ActorSystem system;

  @BeforeClass
  public static void startActorSystem() {
    system = ActorSystem.create("test");
  }

  private String selfDc = "DC-A";
  private Set<String> allDcs = new HashSet<>(Arrays.asList(selfDc, "DC-B"));


  public <C, E, S> Simulator<C, E, S> createSimulator(Supplier<ReplicatedEntity<C, E, S>> entityCtor) {
    return Simulator.<C, E, S>create(entityCtor, system, selfDc, allDcs, Optional.empty());
  }

  @Test
  public void testTimers() {
    Simulator<FirstTick, Integer, Integer> simulator = createSimulator(() -> new Ticking());
    simulator.runCommand(FirstTick.getInstance());

    assertTrue(simulator.isTimerActive(Ticking.TICK_KEY));
    simulator.triggerTimer(Ticking.TICK_KEY);

    assertTrue(simulator.isTimerActive(Ticking.TICK_KEY));
    assertEquals(Integer.valueOf(1), simulator.state());
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
  }
}

Integration testing

It is also possible to test your replicated entity using an integration test. Such a test starts a number of actor systems, and provides some tools to simulate failures. These would typically connect to a real cassandra instance, making them more faithful but also rather heavy.

To make it easier to spawn a temporary Cassandra instance for testing akka-persistence-multi-dc-testkit provides a CassandraLifecycle utility. This can be relatively easily hooked into your test framework, for example for scalatest your could use a traitJUnit you could initialize it like this:

Scala
import akka.testkit.TestKitBase
import akka.persistence.cassandra.testkit.CassandraLauncher
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite

trait CassandraLifecycleScalatest extends BeforeAndAfterAll { this: TestKitBase with Suite =>

  import CassandraLifecycle._

  def systemName: String

  def cassandraConfigResource: String = CassandraLauncher.DefaultTestConfigResource

  override protected def beforeAll(): Unit = {
    startCassandra(systemName, cassandraConfigResource)
    awaitPersistenceInit(system)
    super.beforeAll()
  }

  override protected def afterAll(): Unit = {
    shutdown(system, verifySystemShutdown = true)
    stopCassandra()
    super.afterAll()
  }
}
Java
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import akka.persistence.multidc.testkit.CassandraLifecycle;
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit;


public class AuctionExampleTest extends JUnitSuite {
  private static ActorSystem system;
  private static ActorSystem otherSystem;

  @BeforeClass
  public static void setupSystems() {
    system = createSystem("DC-A");
    CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource());
    CassandraLifecycle.awaitPersistenceInit(system);

    otherSystem = createSystem("DC-B");
    CassandraLifecycle.awaitPersistenceInit(otherSystem);
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
    TestKit.shutdownActorSystem(otherSystem, true);
    CassandraLifecycle.stopCassandra();
  }
}

Combined with PersistenceMultiDcTestKit you can write tests that verify the behavior when replication between systems is temporarily suspended. You will need to add persistenceMultiDcTestSettings to your actor system configuration.

Scala
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit._

    // Include persistenceMultiDcTestSettings in your ActorSystem configuration
    // to allow suspending replication through PersistenceMultiDcTestKit
    .withFallback(persistenceMultiDcTestSettings("AuctionExampleSpec", dc, allDcs)))

"correctly resolve concurrent writes when replication is temporarily suspended" in
  new TestSetup("test6") {
    disableReplication(otherSystem, system)

    nodeA ! OfferBid("Mary", 42)
    nodeB ! OfferBid("Paul", 43)

    expectHighestBid(nodeA, "Mary", 12)

    enableReplication(otherSystem, system)

    awaitAssert {
      expectHighestBid(nodeA, "Paul", 42)
    }
  }
Java
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit;

      // Include persistenceMultiDcTestSettings in your ActorSystem configuration
      // to allow suspending replication through PersistenceMultiDcTestKit
      .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B"))));

@Test
public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception {
  new TestSetup("test6") {
    {
      PersistenceMultiDcTestKit.disableReplication(otherSystem, system);

      nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
      nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());

      expectHighestBid(nodeA, "Mary", 12);

      PersistenceMultiDcTestKit.enableReplication(otherSystem, system);

      awaitAssert(() -> {
        expectHighestBid(nodeA, "Paul", 42);
        return null;
      });

    }
  };
}

A full example, testing the auction actor described here, might look like this:

Scala
import java.time.Instant
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.persistence.multidc.PersistenceMultiDcSettings

import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import org.scalatest.BeforeAndAfter
import akka.persistence.multidc.testkit._
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit._


import akka.testkit.ImplicitSender
import akka.testkit.TestKit

object AuctionExampleSpec {
  val allDcs = Seq("DC-A", "DC-B")

  val clusterConfig = ConfigFactory.parseString("""
    akka.actor.provider = "cluster"
    akka.remote.netty.tcp.port = 0
    akka.remote.classic.netty.tcp.port = 0
    akka.remote.artery.canonical.port = 0
    akka.remote.artery.canonical.hostname = 127.0.0.1
    akka.cluster.jmx.multi-mbeans-in-same-jvm = on

    # avoid eager init in tests
    akka.persistence.multi-data-center.hot-standby.enabled = off

    # FIXME when using Akka 2.6 we should use Jackson or JavaSerializable
    akka.actor.allow-java-serialization = on
    akka.actor.warn-about-java-serializer-usage = off

    # speed up joining and such
    akka.cluster.gossip-interval = 500 ms
  """)

  def createSystem(dc: String = allDcs.head) =
    ActorSystem(
      "AuctionExampleSpec",
      clusterConfig
        .withFallback(CassandraLifecycle.config)
        // Include persistenceMultiDcTestSettings in your ActorSystem configuration
        // to allow suspending replication through PersistenceMultiDcTestKit
        .withFallback(persistenceMultiDcTestSettings("AuctionExampleSpec", dc, allDcs)))


}

class AuctionExampleSpec extends TestKit(AuctionExampleSpec.createSystem())
  with ImplicitSender
  with WordSpecLike
  with Matchers
  with BeforeAndAfterAll
  with BeforeAndAfter
  with CassandraLifecycleScalatest {
  import AuctionExampleSpec._

  val systemName = "AuctionExampleSpec"
  val settings = PersistenceMultiDcSettings(system)

  val otherSystem = createSystem(allDcs(1))
  val otherSettings = PersistenceMultiDcSettings(otherSystem)

  val allSystems = Seq(system, otherSystem)

  class TestSetup(testName: String) {
    val minimumBid = 12
    val auctionSetupA = AuctionSetup("bicycle", Bid("me", minimumBid, Instant.now(), ""),
      closingAt = Instant.now().plusSeconds(60), responsibleForClosing = true)
    val auctionSetupB = auctionSetupA.copy(responsibleForClosing = false)

    val nodeA = system.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupA, settings), s"auction-A-$testName")
    val nodeB = otherSystem.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupB, otherSettings), s"auction-B-$testName")

    def expectHighestBid(node: ActorRef): Bid = {
      node ! GetHighestBid
      expectMsgType[Bid]
    }
    def expectHighestBid(node: ActorRef, bidder: String, expected: MoneyAmount): Unit = {
      val bid = expectHighestBid(node)
      bid.offer shouldEqual expected
      bid.bidder shouldEqual bidder
    }
    def expectClosed(node: ActorRef): Unit = {
      node ! IsClosed
      expectMsg(true)
    }
  }

  "AuctionExample" should {
    "propagate highest bid to replicated actor" in new TestSetup("test1") {
      // simple bidding
      nodeA ! OfferBid("Mary", 42)
      expectHighestBid(nodeA, "Mary", minimumBid) // ebay style, still the minimum offer

      nodeA ! OfferBid("Paul", 41)
      expectHighestBid(nodeA, "Mary", 41) // ebay style, now updated to the highest counter offer

      awaitAssert {
        // check that results have propagated to b
        expectHighestBid(nodeB, "Mary", 41) // ebay style, now updated to the highest counter offer
      }

      // make sure that first bidder still keeps the crown
      nodeB ! OfferBid("c", 42)
      expectHighestBid(nodeB, "Mary", 42)
    }

    "eventually resolve conflicting bids during auction if bids are highest (but different) in each dc" in new TestSetup("test2") {
      // highest bid comes first
      nodeA ! OfferBid("Mary", 42)
      nodeB ! OfferBid("Paul", 41)

      awaitAssert {
        expectHighestBid(nodeA, "Mary", 41)
        expectHighestBid(nodeB, "Mary", 41)
      }

      // highest bid comes second
      nodeA ! OfferBid("Paul", 50)
      nodeB ! OfferBid("Kat", 60)

      awaitAssert {
        expectHighestBid(nodeA, "Kat", 50)
        expectHighestBid(nodeB, "Kat", 50)
      }
    }
    "eventually resolve conflicting bids during auction if bids are highest and equal (but different time) in each dc" in
      new TestSetup("test3") {
        nodeA ! OfferBid("Mary", 15)
        expectHighestBid(nodeA, "Mary", 12)

        nodeB ! OfferBid("Paul", 15)

        awaitAssert {
          expectHighestBid(nodeA, "Mary", 15)
          expectHighestBid(nodeB, "Mary", 15)
        }
      }

    "eventually come to a consistent final result" in
      new TestSetup("test4") {
        nodeA ! OfferBid("Mary", 42)
        nodeB ! OfferBid("Paul", 43)
        Thread.sleep(3000)

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeA ! Finish

        awaitAssert {
          expectClosed(nodeA)
          expectClosed(nodeB)
        }

        expectHighestBid(nodeA, "Paul", 43)
        expectHighestBid(nodeB, "Paul", 43)
      }

    "eventually come to a consistent final result when finishing is initiated on node B" in
      new TestSetup("test5") {
        nodeA ! OfferBid("Mary", 42)
        nodeB ! OfferBid("Paul", 43)
        Thread.sleep(3000)

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeB ! Finish

        awaitAssert {
          expectClosed(nodeA)
          expectClosed(nodeB)
        }

        expectHighestBid(nodeA, "Paul", 43)
        expectHighestBid(nodeB, "Paul", 43)
      }

    "correctly resolve concurrent writes when replication is temporarily suspended" in
      new TestSetup("test6") {
        disableReplication(otherSystem, system)

        nodeA ! OfferBid("Mary", 42)
        nodeB ! OfferBid("Paul", 43)

        expectHighestBid(nodeA, "Mary", 12)

        enableReplication(otherSystem, system)

        awaitAssert {
          expectHighestBid(nodeA, "Paul", 42)
        }
      }
  }

  after {
    allSystems.foreach { sys =>
      enableAllReplicationTo(sys)
    }
  }

  override protected def afterAll(): Unit = {
    allSystems.foreach(sys => shutdown(sys))
    super.afterAll()
  }
}
Java
import java.io.Serializable;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.JAPI;
import akka.persistence.cassandra.testkit.CassandraLauncher;
import akka.persistence.multidc.PersistenceMultiDcSettings;
import akka.testkit.javadsl.TestKit;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import akka.persistence.multidc.testkit.CassandraLifecycle;
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit;


import scala.concurrent.duration.Duration;

import static org.junit.Assert.assertEquals;

public class AuctionExampleTest extends JUnitSuite {

  static class Bid implements Serializable {
    final String bidder;
    final int offer;
    final Instant timestamp;
    final String originDc;

    Bid(String bidder, int offer, Instant timestamp, String originDc) {
      this.bidder = bidder;
      this.offer = offer;
      this.timestamp = timestamp;
      this.originDc = originDc;
    }

    Bid withOffer(int offer) {
      return new Bid(bidder, offer, timestamp, originDc);
    }
  }


  // commands
  interface AuctionCommand extends Serializable {}
  static class OfferBid implements AuctionCommand {
    final String bidder;
    final int offer;
    public OfferBid(String bidder, int offer) {
      this.bidder = bidder;
      this.offer = offer;
    }
  }
  // An auction coordinator needs to schedule this event to each replica
  static class Finish implements AuctionCommand {
    static final Finish INSTANCE = new Finish();
    private Finish() {}
  }
  static class GetHighestBid implements AuctionCommand {
    static final GetHighestBid INSTANCE = new GetHighestBid();
    private GetHighestBid() {}
  }
  static class IsClosed implements AuctionCommand {
    static final IsClosed INSTANCE = new IsClosed();
    private IsClosed() {}
  }
  // Internal, should not be sent from the outside
  private static class Close implements AuctionCommand {
    static final Close INSTANCE = new Close();
    private Close() {}
  }

  // events
  interface AuctionEvent extends Serializable {}
  static class BidRegistered implements AuctionEvent {
    final Bid bid;
    public BidRegistered(Bid bid) {
      this.bid = bid;
    }
  }
  static class AuctionFinished implements AuctionEvent {
    final String atDc;
    public AuctionFinished(String atDc) {
      this.atDc = atDc;
    }
  }
  static class WinnerDecided implements AuctionEvent {
    final String atDc;
    final Bid winningBid;
    final int highestCounterOffer;
    public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) {
      this.atDc = atDc;
      this.winningBid = winningBid;
      this.highestCounterOffer = highestCounterOffer;
    }
  }


  static class AuctionState {

    final boolean stillRunning;
    final Bid highestBid;
    // in ebay style auctions, we need to keep track of current highest counter offer
    final int highestCounterOffer;
    final Set<String> finishedAtDc;

    AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) {
      this.stillRunning = stillRunning;
      this.highestBid = highestBid;
      this.highestCounterOffer = highestCounterOffer;
      this.finishedAtDc = finishedAtDc;
    }

    AuctionState withNewHighestBid(Bid bid) {
      assert(stillRunning);
      assert(isHigherBid(bid, highestBid));
      return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around
    }


    AuctionState withTooLowBid(Bid bid) {
      assert(stillRunning);
      assert(isHigherBid(highestBid, bid));
      return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc);
    }

    static Boolean isHigherBid(Bid first, Bid second) {
      return first.offer > second.offer ||
        (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
        // If timestamps are equal, choose by dc where the offer was submitted
        // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
        // particular DC would not be an advantage.
        (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0);
    }

    AuctionState addFinishedAtDc(String dc) {
      Set<String> s = new HashSet<>(finishedAtDc);
      s.add(dc);
      return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s));
    }

    public AuctionState close() {
      return new AuctionState(
          false,
          highestBid,
          highestCounterOffer,
          Collections.emptySet()
      );
    }

    public boolean isClosed() {
      return !stillRunning && finishedAtDc.isEmpty();
    }
  }

  private static final String FINISH_TIMER = "FinishTimer";

  static class AuctionSetup {
    final String name;
    final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner
    final Instant closingAt;
    final boolean responsibleForClosing;
    public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) {
      this.name = name;
      this.initialBid = initialBid;
      this.closingAt = closingAt;
      this.responsibleForClosing = responsibleForClosing;
    }
  }

  static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> {

    final AuctionSetup auctionSetup;

    public AuctionEntity(AuctionSetup auctionSetup) {
      this.auctionSetup = auctionSetup;
    }

    @Override
    public AuctionState initialState() {
      return new AuctionState(
          true,
          auctionSetup.initialBid,
          auctionSetup.initialBid.offer,
          Collections.emptySet());
    }

    @Override
    public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() {
      return byStateCommandHandlerBuilder(AuctionState.class)
          .matchState(state -> state.stillRunning, running())
          .matchAny(finished());
    }

    private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() {
      return commandHandlerBuilder(AuctionCommand.class)
          .matchCommand(OfferBid.class, (ctx, state, offerBid) -> {
            return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer,
                Instant.ofEpochMilli(currentTimeMillis()), selfDc())));
          }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf());
            return Effect().none();
          }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> {
            log().info("Finish");
            return Effect().persist(new AuctionFinished(selfDc()));
          }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> {
            log().warning("Premature close");
            // Close should only be triggered when we have already finished
            return Effect().unhandled();
          }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(false, getSelf());
            return Effect().none();
          }).build();
    }

    private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() {
      return commandHandlerBuilder(AuctionCommand.class)
          .matchCommand(OfferBid.class, (ctx, state, offerBid) -> {
            // auction finished, no more bids accepted
            return Effect().unhandled();
          }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(state.highestBid, getSelf());
            return Effect().none();
          }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> {
            log().info("Finish");
            return Effect().persist(new AuctionFinished(selfDc()));
          }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> {
            log().info("Close");
            // TODO send email (before or after persisting)
            return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer));
          }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> {
              ctx.getSender().tell(state.isClosed(), getSelf());
              return Effect().none();
          }).build();
    }

    @Override
    public EventHandler<AuctionEvent, AuctionState> eventHandler() {
      return eventHandlerBuilder(AuctionEvent.class)
          .matchEvent(BidRegistered.class, (state, event) -> {
            if (AuctionState.isHigherBid(event.bid, state.highestBid)) {
              return state.withNewHighestBid(event.bid);
            } else {
              return state.withTooLowBid(event.bid);
            }
          }).matchEvent(AuctionFinished.class, (state, event) -> {
            if (state.isClosed())
              return state; // already closed
            else
              return state.addFinishedAtDc(event.atDc);
          }).matchEvent(WinnerDecided.class, (state, event) -> {
            return state.close();
          })
          .build();
    }

    private boolean shouldClose(AuctionState state) {
      return auctionSetup.responsibleForClosing && !state.isClosed() &&
          getAllDcs().equals(state.finishedAtDc);
    }

    private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) {
      if (shouldClose(state))
        ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf());
    }


    @Override
    public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx,
                              AuctionState state, AuctionEvent event) {
      if (event instanceof AuctionFinished && !state.isClosed()) {
        AuctionFinished finished = (AuctionFinished) event;
        log().info("AuctionFinished at {}, already finished at [{}]",
            finished.atDc, state.finishedAtDc);
        ActorContext actorCtx = ctx.actorContext();
        if (state.finishedAtDc.contains(getSelfDc())) {
          triggerCloseIfNeeded(actorCtx, state);
        } else {
          log().info("Sending finish to self");
          actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf());
        }
      }

      return Effect().none();
    }

    @Override
    public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) {
      triggerCloseIfNeeded(ctx, state);

      long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis();
      ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE,
          Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS));
      return Effect().none();
    }

  }

  static Props auctionProps(String pid, AuctionSetup auctionSetup, PersistenceMultiDcSettings settings) {
    return ReplicatedEntity.props(
        AuctionCommand.class,
        "auction",
        pid,
        () -> new AuctionEntity(auctionSetup),
        settings);
  }


  static private class TestSetup extends TestKit {
    final String testName;
    final int minimumBid = 12;
    final AuctionSetup auctionSetupA;
    final AuctionSetup auctionSetupB;
    final ActorRef nodeA;
    final ActorRef nodeB;

    TestSetup(String testName) {
      super(system);
      this.testName = testName;
      auctionSetupA = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(),
          ""), Instant.now().plusSeconds(60), true);
      auctionSetupB = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(),
          ""), Instant.now().plusSeconds(60), false);

      PersistenceMultiDcSettings settings = PersistenceMultiDcSettings.create(system);
      PersistenceMultiDcSettings otherSettings = PersistenceMultiDcSettings.create(otherSystem);

      nodeA = system.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupA, settings), "auction-A-" + testName);
      nodeB = otherSystem.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupB, otherSettings), "auction-B-" + testName);
    }

    Bid expectHighestBid(ActorRef node) {
        node.tell(GetHighestBid.INSTANCE, getTestActor());
        return expectMsgClass(Bid.class);
    }

    void expectHighestBid(ActorRef node, String bidder, int expected) {
        Bid bid = expectHighestBid(node);
        assertEquals(expected, bid.offer);
        assertEquals(bidder, bid.bidder);
    }

    void expectClosed(ActorRef node) {
      node.tell(IsClosed.INSTANCE, getTestActor());
      expectMsg(true);
    }

  }

  private static Config clusterConfig = ConfigFactory.parseString(String.join("\n",
    "akka.actor.provider = \"cluster\"",
    "akka.remote.netty.tcp.port = 0",
    "akka.remote.classic.netty.tcp.port = 0",
    "akka.remote.artery.canonical.port = 0",
    "akka.remote.artery.canonical.hostname = 127.0.0.1",
    "akka.cluster.jmx.multi-mbeans-in-same-jvm = on",
    // avoid eager init in tests
    "akka.persistence.multi-data-center.hot-standby.enabled = off",
    // FIXME when using Akka 2.6 we should use Jackson or JavaSerializable
    "akka.actor.allow-java-serialization = on",
    "akka.actor.warn-about-java-serializer-usage = off",
    // speed up joining and such
    "akka.cluster.gossip-interval = 500 ms"));

  private static ActorSystem createSystem(String dc) {
    return ActorSystem.create(
      "auctionTest",
      clusterConfig
        .withFallback(CassandraLifecycle.config())
        // Include persistenceMultiDcTestSettings in your ActorSystem configuration
        // to allow suspending replication through PersistenceMultiDcTestKit
        .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B"))));

  }

  private static ActorSystem system;
  private static ActorSystem otherSystem;

  @BeforeClass
  public static void setupSystems() {
    system = createSystem("DC-A");
    CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource());
    CassandraLifecycle.awaitPersistenceInit(system);

    otherSystem = createSystem("DC-B");
    CassandraLifecycle.awaitPersistenceInit(otherSystem);
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
    TestKit.shutdownActorSystem(otherSystem, true);
    CassandraLifecycle.stopCassandra();
  }

  @Test
  public void propagateHighestBidToReplicatedActor() {
    new TestSetup("test-1") {
      {
        // simple bidding
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        expectHighestBid(nodeA, "Mary", minimumBid); // ebay style, still the minimum offer

        nodeA.tell(new OfferBid("Paul", 41), ActorRef.noSender());
        expectHighestBid(nodeA, "Mary", 41); // ebay style, now updated to the highest counter offer

        awaitAssert(
            () -> {
              expectHighestBid(nodeB, "Mary", 41);
              return null;
            });

        awaitAssert(
            () -> {
              // check that results have propagated to b
              expectHighestBid(nodeB, "Mary", 41); // ebay style, now updated to the highest counter offer
              return null;
            }
        );

        // make sure that first bidder still keeps the crown
        nodeB.tell(new OfferBid("c", 42), ActorRef.noSender());
        expectHighestBid(nodeB, "Mary", 42);
      }
    };
  }


  @Test
  public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestButDifferentInEachDc() {
    new TestSetup("test-2") {
      {
        // highest bid comes first
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 41), ActorRef.noSender());

        awaitAssert(() -> {
          expectHighestBid(nodeA, "Mary", 41);
          expectHighestBid(nodeB, "Mary", 41);
          return null;
        });


        // highest bid comes second
        nodeA.tell(new OfferBid("Paul", 50), ActorRef.noSender());
        nodeB.tell(new OfferBid("Kat", 60), ActorRef.noSender());

        awaitAssert(() -> {
          expectHighestBid(nodeA, "Kat", 50);
          expectHighestBid(nodeB, "Kat", 50);
          return null;
        });
      }
    };
  }

  @Test
  public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestAndEqualButDifferentTimeInEachDc() {
    new TestSetup("test3") {
      {
        nodeA.tell(new OfferBid("Mary", 15), ActorRef.noSender());
        expectHighestBid(nodeA, "Mary", 12);

        nodeB.tell(new OfferBid("Paul", 15), ActorRef.noSender());

        awaitAssert(() -> {
            expectHighestBid(nodeA, "Mary", 15);
            expectHighestBid(nodeB, "Mary", 15);
            return null;
        });
      }
    };
  }

  @Test
  public void eventuallyComeToAConsistentFinalResult() throws Exception {
    new TestSetup("test4") {
      {
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());
        Thread.sleep(3000);

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeA.tell(Finish.INSTANCE, ActorRef.noSender());

        awaitAssert(() -> {
          expectClosed(nodeA);
          expectClosed(nodeB);
          return null;
        });

        expectHighestBid(nodeA, "Paul", 43);
        expectHighestBid(nodeB, "Paul", 43);
      }
    };
  }

  @Test
  public void eventuallyComeToAConsistentFinalResultWhenFinishingIsInitiatedOnNodeB() throws Exception {
    new TestSetup("test5") {
      {
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());
        Thread.sleep(3000);

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeB.tell(Finish.INSTANCE, ActorRef.noSender());

        awaitAssert(() -> {
          expectClosed(nodeA);
          expectClosed(nodeB);
          return null;
        });

        expectHighestBid(nodeA, "Paul", 43);
        expectHighestBid(nodeB, "Paul", 43);
      }
    };
  }

  @Test
  public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception {
    new TestSetup("test6") {
      {
        PersistenceMultiDcTestKit.disableReplication(otherSystem, system);

        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());

        expectHighestBid(nodeA, "Mary", 12);

        PersistenceMultiDcTestKit.enableReplication(otherSystem, system);

        awaitAssert(() -> {
          expectHighestBid(nodeA, "Paul", 42);
          return null;
        });

      }
    };
  }
}

Multi-JVM testing

A Multi JVM Test is even more heavyweight, as it will spawn an entire new JVM for each cluster node. This does allow for even more production-like tests, however bear in mind that it is also harder to interpret any failures.

import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
import akka.actor.{ ActorSystem, PoisonPill, Props }
import akka.cluster.Cluster
import akka.cluster.sharding.{ ClusterSharding, ClusterShardingSettings, ShardRegion }
import akka.persistence.multidc.PersistenceMultiDcSettings
import akka.persistence.multidc.scaladsl.ReplicatedEntity

import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }
import akka.testkit.TestProbe
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.persistence.multidc.testkit.CassandraLifecycle
import akka.persistence.multidc.testkit.ContactPoints
import akka.persistence.multidc.internal.MultiNodeClusterSpec
import akka.persistence.multidc.testkit.MultiNodeSessionProvider

object ShardedReplicatedEntitySpec extends MultiNodeConfig {
  final val CassPort = 50100

  val dc1node1 = role("dc1-node1")
  val dc1node2 = role("dc1-node2")
  val dc2node3 = role("dc2-node3")
  val dc2node4 = role("dc2-node4")

  commonConfig(ConfigFactory.parseString(
    s"""
      akka.loglevel = DEBUG
      akka.actor.provider = cluster
      akka.testconductor.barrier-timeout = 60 s
      cassandra-journal-multi-dc.keyspace = ShardedReplicatedEntitySpec
      cassandra-snapshot-store.keyspace = ShardedReplicatedEntitySpecSnapshot
      cassandra-journal-multi-dc.port = $CassPort
      cassandra-snapshot-store.port = $CassPort
      cassandra-journal-multi-dc.session-provider = ${classOf[MultiNodeSessionProvider].getName}
      cassandra-snapshot-store.session-provider = ${classOf[MultiNodeSessionProvider].getName}

      # avoid eager init in tests
      akka.persistence.multi-data-center.hot-standby.enabled = off

      # FIXME when using Akka 2.6 we should use Jackson or JavaSerializable
      akka.actor.allow-java-serialization = on
      akka.actor.warn-about-java-serializer-usage = off

      # speed up joining and such
      akka.cluster.gossip-interval = 500 ms
    """).withFallback(CassandraLifecycle.config))

  nodeConfig(dc1node1, dc1node2) {
    ConfigFactory.parseString(
      """
        |akka.cluster.multi-data-center.self-data-center = DC-1
        |akka.persistence.multi-data-center.all-data-centers = [DC-1, DC-2]
      """.stripMargin)
  }

  nodeConfig(dc2node3, dc2node4) {
    ConfigFactory.parseString(
      """
        |akka.cluster.multi-data-center.self-data-center = DC-2
        |akka.persistence.multi-data-center.all-data-centers = [DC-1, DC-2]
      """.stripMargin)
  }

  case class CommandEnvelope(id: Long, command: Command)

  sealed trait Command
  case class UsageReport(usage: Long, timestamp: Long) extends Command
  case object GetUsage extends Command
  case object Stop extends Command

  sealed trait Event
  case class UsageReported(usage: Long, timestamp: Long) extends Event

  case class State(totalUsage: Long, timestamp: Long)

  def serverProps()(implicit system: ActorSystem): Props =
    ReplicatedEntity.clusterShardingProps("server", () => new Server, PersistenceMultiDcSettings(system))

  class Server() extends ReplicatedEntity[Command, Event, State] {
    override def initialState: State = State(0, -1)

    override def commandHandler = CommandHandler {
      case (ctx, _, UsageReport(usage, ts)) => Effect.persist(UsageReported(usage, ts))
      case (ctx, state, GetUsage) =>
        ctx.sender() ! state
        Effect.none
      case (ctx, _, Stop) =>
        ctx.self ! PoisonPill
        Effect.none
    }

    override def eventHandler(currentState: State, event: Event): State = event match {
      case UsageReported(usage, ts) => State(currentState.totalUsage + usage, ts)
    }
  }
}

class ShardedReplicatedEntitySpecMultiJvmNode1 extends ShardedReplicatedEntitySpec
class ShardedReplicatedEntitySpecMultiJvmNode2 extends ShardedReplicatedEntitySpec
class ShardedReplicatedEntitySpecMultiJvmNode3 extends ShardedReplicatedEntitySpec
class ShardedReplicatedEntitySpecMultiJvmNode4 extends ShardedReplicatedEntitySpec

abstract class ShardedReplicatedEntitySpec extends MultiNodeSpec(ShardedReplicatedEntitySpec)
  with MultiNodeClusterSpec with WordSpecLike with Matchers with ScalaFutures with BeforeAndAfterAll {
  import ShardedReplicatedEntitySpec._
  import CassandraLifecycle._

  val extractEntityId: ShardRegion.ExtractEntityId = {
    case CommandEnvelope(id, command) => (id.toString, command)
  }

  val numberOfShards = 100
  val extractShardId: ShardRegion.ExtractShardId = {
    case CommandEnvelope(id, _) => (id % numberOfShards).toString
    case ShardRegion.StartEntity(id) =>
      // StartEntity is used by remembering entities feature
      (id.toLong % numberOfShards).toString
  }

  override def beforeAll() = {
    val CassHost = node(dc1node1).address.host.get

    ContactPoints(system).completeAddressDefaultClusterId(CassHost)
    runOn(dc1node1) {
      startCassandra(CassHost, CassPort, system.name)
      awaitPersistenceInit(system)
    }
    enterBarrier("cassandra-init")

    runOn(dc1node2, dc2node3, dc2node4) {
      awaitPersistenceInit(system)
    }
    enterBarrier("persistence-init")
  }

  "sharded replicated entity" should {
    "replicate events" in {
      runOn(dc1node1) {
        Cluster(system).join(node(dc1node1).address)
      }

      runOn(dc2node3) {
        Cluster(system).join(node(dc2node3).address)
      }

      enterBarrier("cluster-init-1")

      runOn(dc1node2) {
        Cluster(system).join(node(dc1node1).address)
      }

      runOn(dc2node4) {
        Cluster(system).join(node(dc2node3).address)
      }

      enterBarrier("cluster-init-2")

      val region = ClusterSharding(system).start(
        typeName = "Server",
        entityProps = serverProps(),
        settings = ClusterShardingSettings(system),
        extractEntityId = extractEntityId,
        extractShardId = extractShardId)

      // this will be sent once on all nodes
      region ! CommandEnvelope(1, UsageReport(1, 1))

      within(10.seconds) {
        val probe = TestProbe()
        awaitAssert {
          region.tell(CommandEnvelope(1, GetUsage), probe.ref)
          probe.expectMsg(3.second, State(4, 1))
        }
      }

      enterBarrier("first-message-done")

      runOn(dc1node1) {
        // stop replicated entity on the DC-A
        region ! CommandEnvelope(1, Stop)
      }

      runOn(dc2node3) {
        // send commands to replicated entity on DC-B
        region ! CommandEnvelope(1, UsageReport(1, 2))
        region ! CommandEnvelope(1, UsageReport(1, 3))
        region ! CommandEnvelope(1, UsageReport(1, 4))

        within(10.seconds) {
          val probe = TestProbe()
          awaitAssert {
            region.tell(CommandEnvelope(1, GetUsage), probe.ref)
            probe.expectMsg(3.second, State(7, 4))
          }
        }
      }

      enterBarrier("commands-sent-single-dc")

      runOn(dc1node1) {
        // bring back replicated entity on the DC-A
        within(10.seconds) {
          val probe = TestProbe()
          awaitAssert {
            region.tell(CommandEnvelope(1, GetUsage), probe.ref)
            probe.expectMsg(3.second, State(7, 4))
          }
        }
      }

      enterBarrier("finish")
    }
  }

}