Auction Example

Having to use CRDTs for replication might seem like a severe constraint because the set of predefined CRDTs is quite limited. In this example we want to show that real-world applications can be implemented by designing events in a way that they don’t conflict. In the end, you will end up with a solution based on a custom CRDT.

We are building a small auction service. An auction is represented by one replicated actor. It should have the following operations:

  • Place a bid
  • Get the currently highest bid
  • Finish the auction

We model those operations as commands to be sent to the auction actor:

Scala
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)

// commands
sealed trait AuctionCommand
case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica
case object GetHighestBid extends AuctionCommand
case object IsClosed extends AuctionCommand
private case object Close extends AuctionCommand // Internal, should not be sent from the outside
Java
static class Bid implements Serializable {
  final String bidder;
  final int offer;
  final Instant timestamp;
  final String originDc;

  Bid(String bidder, int offer, Instant timestamp, String originDc) {
    this.bidder = bidder;
    this.offer = offer;
    this.timestamp = timestamp;
    this.originDc = originDc;
  }

  Bid withOffer(int offer) {
    return new Bid(bidder, offer, timestamp, originDc);
  }
}


// commands
interface AuctionCommand extends Serializable {}
static class OfferBid implements AuctionCommand {
  final String bidder;
  final int offer;
  public OfferBid(String bidder, int offer) {
    this.bidder = bidder;
    this.offer = offer;
  }
}
// An auction coordinator needs to schedule this event to each replica
static class Finish implements AuctionCommand {
  static final Finish INSTANCE = new Finish();
  private Finish() {}
}
static class GetHighestBid implements AuctionCommand {
  static final GetHighestBid INSTANCE = new GetHighestBid();
  private GetHighestBid() {}
}
static class IsClosed implements AuctionCommand {
  static final IsClosed INSTANCE = new IsClosed();
  private IsClosed() {}
}
// Internal, should not be sent from the outside
private static class Close implements AuctionCommand {
  static final Close INSTANCE = new Close();
  private Close() {}
}

The auction entity is an event-sourced persistent actor. These events are used to persist state changes:

Scala
// events
sealed trait AuctionEvent
case class BidRegistered(bid: Bid) extends AuctionEvent
case class AuctionFinished(atDc: String) extends AuctionEvent
case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent
Java
// events
interface AuctionEvent extends Serializable {}
static class BidRegistered implements AuctionEvent {
  final Bid bid;
  public BidRegistered(Bid bid) {
    this.bid = bid;
  }
}
static class AuctionFinished implements AuctionEvent {
  final String atDc;
  public AuctionFinished(String atDc) {
    this.atDc = atDc;
  }
}
static class WinnerDecided implements AuctionEvent {
  final String atDc;
  final Bid winningBid;
  final int highestCounterOffer;
  public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) {
    this.atDc = atDc;
    this.winningBid = winningBid;
    this.highestCounterOffer = highestCounterOffer;
  }
}

You may have noticed here, that we include the highestCounterOffer in the AuctionFinished event. This is because we use a popular auction style where the winner does not have to pay the highest bidden price but only just enough to beat the second highest bid.

Let’s have a look at the auction entity that will handle incoming commands:

Scala
case class AuctionSetup(
  name:                  String,
  initialBid:            Bid, // the initial bid is basically the minimum price bidden at start time by the owner
  closingAt:             Instant,
  responsibleForClosing: Boolean)

class AuctionEntity(auctionSetup: AuctionSetup)
  extends ReplicatedEntity[AuctionCommand, AuctionEvent, AuctionState] {

  override def initialState: AuctionState =
    AuctionState(
      phase = Running,
      highestBid = auctionSetup.initialBid,
      highestCounterOffer = auctionSetup.initialBid.offer)

  override def commandHandler: CommandHandler = {
    CommandHandler.byState {
      case AuctionState(Running, _, _) => running
      case _                           => finished
    }
  }

  private val running: CommandHandler = {
    CommandHandler {
      case (_, _, OfferBid(bidder, offer)) =>
        Effect.persist(BidRegistered(Bid(bidder, offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc)))
      case (ctx, state, GetHighestBid) =>
        ctx.sender() ! state.highestBid.copy(offer = state.highestCounterOffer)
        Effect.none
      case (_, _, Finish) =>
        log.info("Finish")
        Effect.persist(AuctionFinished(selfDc))
      case (_, _, Close) =>
        log.warning("Premature close")
        // Close should only be triggered when we have already finished
        Effect.unhandled
      case (ctx, _, IsClosed) =>
        ctx.sender() ! false
        Effect.none
    }
  }

  private val finished: CommandHandler = {
    CommandHandler {
      case (ctx, state, GetHighestBid) =>
        ctx.sender() ! state.highestBid
        Effect.none
      case (ctx, state, IsClosed) =>
        ctx.sender() ! (state.phase == Closed)
        Effect.none
      case (ctx, _, Finish) =>
        log.info("Finish")
        Effect.persist(AuctionFinished(selfDc))
      case (_, state, Close) =>
        log.info("Close")
        require(shouldClose(state))
        // TODO send email (before or after persisting)
        Effect.persist(WinnerDecided(selfDc, state.highestBid, state.highestCounterOffer))
      case (_, _, _: OfferBid) =>
        // auction finished, no more bids accepted
        Effect.unhandled
    }
  }

  override def eventHandler(state: AuctionState, event: AuctionEvent): AuctionState =
    state.applyEvent(event)

  private def shouldClose(state: AuctionState): Boolean = {
    auctionSetup.responsibleForClosing && (state.phase match {
      case Closing(alreadyFinishedAtDc) => allDcs.diff(alreadyFinishedAtDc).isEmpty
      case _                            => false
    })
  }

  private def triggerCloseIfNeeded(ctx: ActorContext, state: AuctionState): Unit = {
    if (shouldClose(state)) ctx.self ! Close
  }

  override def eventTrigger(
    ctx:   EventTriggerContext,
    state: AuctionState, event: AuctionEvent): Effect[AuctionEvent, AuctionState] = {
    event match {
      case finished: AuctionFinished =>
        state.phase match {
          case Closing(alreadyFinishedAtDc) =>
            log.info("AuctionFinished at {}, already finished at [{}]", finished.atDc,
              alreadyFinishedAtDc.mkString(", "))
            if (alreadyFinishedAtDc(selfDc)) {
              triggerCloseIfNeeded(ctx.actorContext, state)
            } else {
              log.info("Sending finish to self")
              ctx.actorContext.self ! Finish
            }

          case _ => // no trigger for this state
        }
      case _ => // no trigger for this event
    }
    Effect.none
  }

  override def recoveryCompleted(ctx: ActorContext, state: AuctionState): Effect[AuctionEvent, AuctionState] = {
    triggerCloseIfNeeded(ctx, state)

    val millisUntilClosing = auctionSetup.closingAt.toEpochMilli - currentTimeMillis()
    ctx.timers.startSingleTimer(FinishTimer, Finish, millisUntilClosing.millis)

    Effect.none
  }

}
Java
static class AuctionSetup {
  final String name;
  final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner
  final Instant closingAt;
  final boolean responsibleForClosing;
  public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) {
    this.name = name;
    this.initialBid = initialBid;
    this.closingAt = closingAt;
    this.responsibleForClosing = responsibleForClosing;
  }
}

static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> {

  final AuctionSetup auctionSetup;

  public AuctionEntity(AuctionSetup auctionSetup) {
    this.auctionSetup = auctionSetup;
  }

  @Override
  public AuctionState initialState() {
    return new AuctionState(
        true,
        auctionSetup.initialBid,
        auctionSetup.initialBid.offer,
        Collections.emptySet());
  }

  @Override
  public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() {
    return byStateCommandHandlerBuilder(AuctionState.class)
        .matchState(state -> state.stillRunning, running())
        .matchAny(finished());
  }

  private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() {
    return commandHandlerBuilder(AuctionCommand.class)
        .matchCommand(OfferBid.class, (ctx, state, offerBid) -> {
          return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer,
              Instant.ofEpochMilli(currentTimeMillis()), selfDc())));
        }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> {
          ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf());
          return Effect().none();
        }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> {
          log().info("Finish");
          return Effect().persist(new AuctionFinished(selfDc()));
        }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> {
          log().warning("Premature close");
          // Close should only be triggered when we have already finished
          return Effect().unhandled();
        }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> {
          ctx.getSender().tell(false, getSelf());
          return Effect().none();
        }).build();
  }

  private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() {
    return commandHandlerBuilder(AuctionCommand.class)
        .matchCommand(OfferBid.class, (ctx, state, offerBid) -> {
          // auction finished, no more bids accepted
          return Effect().unhandled();
        }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> {
          ctx.getSender().tell(state.highestBid, getSelf());
          return Effect().none();
        }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> {
          log().info("Finish");
          return Effect().persist(new AuctionFinished(selfDc()));
        }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> {
          log().info("Close");
          // TODO send email (before or after persisting)
          return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer));
        }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(state.isClosed(), getSelf());
            return Effect().none();
        }).build();
  }

  @Override
  public EventHandler<AuctionEvent, AuctionState> eventHandler() {
    return eventHandlerBuilder(AuctionEvent.class)
        .matchEvent(BidRegistered.class, (state, event) -> {
          if (AuctionState.isHigherBid(event.bid, state.highestBid)) {
            return state.withNewHighestBid(event.bid);
          } else {
            return state.withTooLowBid(event.bid);
          }
        }).matchEvent(AuctionFinished.class, (state, event) -> {
          if (state.isClosed())
            return state; // already closed
          else
            return state.addFinishedAtDc(event.atDc);
        }).matchEvent(WinnerDecided.class, (state, event) -> {
          return state.close();
        })
        .build();
  }

  private boolean shouldClose(AuctionState state) {
    return auctionSetup.responsibleForClosing && !state.isClosed() &&
        getAllDcs().equals(state.finishedAtDc);
  }

  private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) {
    if (shouldClose(state))
      ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf());
  }


  @Override
  public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx,
                            AuctionState state, AuctionEvent event) {
    if (event instanceof AuctionFinished && !state.isClosed()) {
      AuctionFinished finished = (AuctionFinished) event;
      log().info("AuctionFinished at {}, already finished at [{}]",
          finished.atDc, state.finishedAtDc);
      ActorContext actorCtx = ctx.actorContext();
      if (state.finishedAtDc.contains(getSelfDc())) {
        triggerCloseIfNeeded(actorCtx, state);
      } else {
        log().info("Sending finish to self");
        actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf());
      }
    }

    return Effect().none();
  }

  @Override
  public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) {
    triggerCloseIfNeeded(ctx, state);

    long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis();
    ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE,
        Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS));
    return Effect().none();
  }

}

The auction entity is started with the initial parameters for the auction. As seen before, replicated entities need to be parameterized with the types for commands and events and also for the internal state.

In the initialState method, a replicated entity needs to define its original state. In our case, it’s straightforward to initialize the initial state from our initialization parameters as given in the AuctionSetup instance. The minimum bid is in our case modelled as an initialBid.

The actions defines how to react to external commands. In our case, for OfferBid and AuctionFinished we do nothing more than to emit events corresponding to the command. For GetHighestBid we respond with details from the state. Note, that we overwrite the actual offer of the highest bid here with the amount of the highestCounterOffer. This is done to follow the popular auction style where the actual highest bid is never publicly revealed.

Let’s have a look at our state class, AuctionState which also represents the CRDT in our example.

Scala

private val FinishTimer = "FinishTimer" case class AuctionState( phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer ) { def applyEvent(event: AuctionEvent): AuctionState = event match { case BidRegistered(b) => if (isHigherBid(b, highestBid)) withNewHighestBid(b) else withTooLowBid(b) case AuctionFinished(atDc) => phase match { case Running => copy(phase = Closing(Set(atDc))) case Closing(alreadyFinishedDcs) => copy(phase = Closing(alreadyFinishedDcs + atDc)) case _ => this } case _: WinnerDecided => copy(phase = Closed) } def withNewHighestBid(bid: Bid): AuctionState = { require(phase != Closed) require(isHigherBid(bid, highestBid)) copy( highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around ) } def withTooLowBid(bid: Bid): AuctionState = { require(phase != Closed) require(isHigherBid(highestBid, bid)) copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer } def isHigherBid(first: Bid, second: Bid): Boolean = first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0) }
Java
static class AuctionState {

  final boolean stillRunning;
  final Bid highestBid;
  // in ebay style auctions, we need to keep track of current highest counter offer
  final int highestCounterOffer;
  final Set<String> finishedAtDc;

  AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) {
    this.stillRunning = stillRunning;
    this.highestBid = highestBid;
    this.highestCounterOffer = highestCounterOffer;
    this.finishedAtDc = finishedAtDc;
  }

  AuctionState withNewHighestBid(Bid bid) {
    assert(stillRunning);
    assert(isHigherBid(bid, highestBid));
    return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around
  }


  AuctionState withTooLowBid(Bid bid) {
    assert(stillRunning);
    assert(isHigherBid(highestBid, bid));
    return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc);
  }

  static Boolean isHigherBid(Bid first, Bid second) {
    return first.offer > second.offer ||
      (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
      // If timestamps are equal, choose by dc where the offer was submitted
      // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
      // particular DC would not be an advantage.
      (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0);
  }

  AuctionState addFinishedAtDc(String dc) {
    Set<String> s = new HashSet<>(finishedAtDc);
    s.add(dc);
    return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s));
  }

  public AuctionState close() {
    return new AuctionState(
        false,
        highestBid,
        highestCounterOffer,
        Collections.emptySet()
    );
  }

  public boolean isClosed() {
    return !stillRunning && finishedAtDc.isEmpty();
  }
}

The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid, and the highest counter offer so far.

In the eventHandler, we handle persisted events to drive the state change. When a new bid is registered,

  • it needs to be decided whether the new bid is the winning bid or not
  • the state needs to be updated accordingly

The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed. We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally highest regardless of the order in which the events come in.

The ordering between bids is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state outside of our state class so that all replicas come to the same result. We define the ordering as this:

  • A higher bid wins.
  • If there’s a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the (local) timestamp the bid was registered.
  • We need to make sure that no timestamp is used twice in the same DC (missing in this example).
  • If there’s a tie between the timestamp, we define an arbitrary but deterministic ordering on the DCs, in our case we just compare the name strings of the DCs. That’s why we need to keep the identifier of the DC where a bid was registered for every Bid.

If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the highestCounterOffer. If the new bid was lower, we just update the highestCounterOffer if necessary.

Using those rules, the order of incoming does not matter. Replicas in all DCs should eventually converge to the same result.

Open questions

The auction example shows basic features of an auction. There are a few additional considerations

  • Replica only eventually converge to the same result. That might lead to surprising results because highest bids from other replicas than the local one might only turn up with a delay. Another surprising result might be that two bids with the same amount issued each to different replicas in quick succession might be ordered differently due clock differences between replicas. In a real bidding system, it needs to be made sure that no replica has a competitive advantage over another one.

Complete example source code

For reference here’s the complete example, including imports and tests:

Scala
/**
 * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
 */
package akka.persistence.multidc.scaladsl

//#full-example
import java.time.Instant
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory

import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.persistence.multidc.PersistenceMultiDcSettings

import org.scalatest.BeforeAndAfterAll
import org.scalatest.Matchers
import org.scalatest.WordSpecLike
import org.scalatest.BeforeAndAfter
import akka.persistence.multidc.testkit._
//#disable-replication
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit._

//#disable-replication

import akka.testkit.ImplicitSender
import akka.testkit.TestKit

object AuctionExampleSpec {
  //#full-example
  type MoneyAmount = Int

  //#auction-commands
  case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String)

  // commands
  sealed trait AuctionCommand
  case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
  case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica
  case object GetHighestBid extends AuctionCommand
  case object IsClosed extends AuctionCommand
  private case object Close extends AuctionCommand // Internal, should not be sent from the outside
  //#auction-commands

  //#auction-events
  // events
  sealed trait AuctionEvent
  case class BidRegistered(bid: Bid) extends AuctionEvent
  case class AuctionFinished(atDc: String) extends AuctionEvent
  case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent
  //#auction-events

  def auctionProps(pid: String, auctionSetup: AuctionSetup, settings: PersistenceMultiDcSettings): Props =
    ReplicatedEntity.props("auction", pid, () => new AuctionEntity(auctionSetup), settings)

  /**
   * The auction passes through several workflow phases.
   * First, in `Running` `OfferBid` commands are accepted.
   *
   * `AuctionEntity` instances in all DCs schedule a `Finish` command
   * at a given time. That persists the `AuctionFinished` event and the
   * phase is in `Closing` until the auction is finished in all DCs.
   *
   * When the auction has been finished no more `OfferBid` commands are accepted.
   *
   * The auction is also finished immediately if `AuctionFinished` event from another
   * DC is seen before the scheduled `Finish` command. In that way the auction is finished
   * as quickly as possible in all DCs even though there might be some clock skew.
   *
   * One DC is responsible for finally deciding the winner and publishing the result.
   * All events must be collected from all DC before that can happen.
   * When the responsible DC has seen all `AuctionFinished` events from other DCs
   * all other events have also been propagated and it can persist `WinnerDecided` and
   * the auction is finally `Closed`.
   *
   */
  sealed trait AuctionPhase
  case object Running extends AuctionPhase
  case class Closing(finishedAtDc: Set[String]) extends AuctionPhase
  case object Closed extends AuctionPhase

  //#auction-state

  private val FinishTimer = "FinishTimer"

  case class AuctionState(
    phase:               AuctionPhase,
    highestBid:          Bid,
    highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer
  ) {

    def applyEvent(event: AuctionEvent): AuctionState =
      event match {
        case BidRegistered(b) =>
          if (isHigherBid(b, highestBid)) withNewHighestBid(b)
          else withTooLowBid(b)
        case AuctionFinished(atDc) =>
          phase match {
            case Running =>
              copy(phase = Closing(Set(atDc)))
            case Closing(alreadyFinishedDcs) =>
              copy(phase = Closing(alreadyFinishedDcs + atDc))
            case _ =>
              this
          }
        case _: WinnerDecided =>
          copy(phase = Closed)
      }

    def withNewHighestBid(bid: Bid): AuctionState = {
      require(phase != Closed)
      require(isHigherBid(bid, highestBid))
      copy(
        highestBid = bid,
        highestCounterOffer = highestBid.offer // keep last highest bid around
      )
    }
    def withTooLowBid(bid: Bid): AuctionState = {
      require(phase != Closed)
      require(isHigherBid(highestBid, bid))
      copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer
    }

    def isHigherBid(first: Bid, second: Bid): Boolean =
      first.offer > second.offer ||
        (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
        // If timestamps are equal, choose by dc where the offer was submitted
        // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
        // particular DC would not be an advantage.
        (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0)
  }
  //#auction-state

  //#auction-actor
  case class AuctionSetup(
    name:                  String,
    initialBid:            Bid, // the initial bid is basically the minimum price bidden at start time by the owner
    closingAt:             Instant,
    responsibleForClosing: Boolean)

  class AuctionEntity(auctionSetup: AuctionSetup)
    extends ReplicatedEntity[AuctionCommand, AuctionEvent, AuctionState] {

    override def initialState: AuctionState =
      AuctionState(
        phase = Running,
        highestBid = auctionSetup.initialBid,
        highestCounterOffer = auctionSetup.initialBid.offer)

    override def commandHandler: CommandHandler = {
      CommandHandler.byState {
        case AuctionState(Running, _, _) => running
        case _                           => finished
      }
    }

    private val running: CommandHandler = {
      CommandHandler {
        case (_, _, OfferBid(bidder, offer)) =>
          Effect.persist(BidRegistered(Bid(bidder, offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc)))
        case (ctx, state, GetHighestBid) =>
          ctx.sender() ! state.highestBid.copy(offer = state.highestCounterOffer)
          Effect.none
        case (_, _, Finish) =>
          log.info("Finish")
          Effect.persist(AuctionFinished(selfDc))
        case (_, _, Close) =>
          log.warning("Premature close")
          // Close should only be triggered when we have already finished
          Effect.unhandled
        case (ctx, _, IsClosed) =>
          ctx.sender() ! false
          Effect.none
      }
    }

    private val finished: CommandHandler = {
      CommandHandler {
        case (ctx, state, GetHighestBid) =>
          ctx.sender() ! state.highestBid
          Effect.none
        case (ctx, state, IsClosed) =>
          ctx.sender() ! (state.phase == Closed)
          Effect.none
        case (ctx, _, Finish) =>
          log.info("Finish")
          Effect.persist(AuctionFinished(selfDc))
        case (_, state, Close) =>
          log.info("Close")
          require(shouldClose(state))
          // TODO send email (before or after persisting)
          Effect.persist(WinnerDecided(selfDc, state.highestBid, state.highestCounterOffer))
        case (_, _, _: OfferBid) =>
          // auction finished, no more bids accepted
          Effect.unhandled
      }
    }

    override def eventHandler(state: AuctionState, event: AuctionEvent): AuctionState =
      state.applyEvent(event)

    private def shouldClose(state: AuctionState): Boolean = {
      auctionSetup.responsibleForClosing && (state.phase match {
        case Closing(alreadyFinishedAtDc) => allDcs.diff(alreadyFinishedAtDc).isEmpty
        case _                            => false
      })
    }

    private def triggerCloseIfNeeded(ctx: ActorContext, state: AuctionState): Unit = {
      if (shouldClose(state)) ctx.self ! Close
    }

    override def eventTrigger(
      ctx:   EventTriggerContext,
      state: AuctionState, event: AuctionEvent): Effect[AuctionEvent, AuctionState] = {
      event match {
        case finished: AuctionFinished =>
          state.phase match {
            case Closing(alreadyFinishedAtDc) =>
              log.info("AuctionFinished at {}, already finished at [{}]", finished.atDc,
                alreadyFinishedAtDc.mkString(", "))
              if (alreadyFinishedAtDc(selfDc)) {
                triggerCloseIfNeeded(ctx.actorContext, state)
              } else {
                log.info("Sending finish to self")
                ctx.actorContext.self ! Finish
              }

            case _ => // no trigger for this state
          }
        case _ => // no trigger for this event
      }
      Effect.none
    }

    override def recoveryCompleted(ctx: ActorContext, state: AuctionState): Effect[AuctionEvent, AuctionState] = {
      triggerCloseIfNeeded(ctx, state)

      val millisUntilClosing = auctionSetup.closingAt.toEpochMilli - currentTimeMillis()
      ctx.timers.startSingleTimer(FinishTimer, Finish, millisUntilClosing.millis)

      Effect.none
    }

  }
  //#auction-actor

  //#full-example
  val allDcs = Seq("DC-A", "DC-B")

  val clusterConfig = ConfigFactory.parseString("""
    akka.actor.provider = "cluster"
    akka.remote.netty.tcp.port = 0
    akka.remote.classic.netty.tcp.port = 0
    akka.remote.artery.canonical.port = 0
    akka.remote.artery.canonical.hostname = 127.0.0.1
    akka.cluster.jmx.multi-mbeans-in-same-jvm = on

    # avoid eager init in tests
    akka.persistence.multi-data-center.hot-standby.enabled = off

    # FIXME when using Akka 2.6 we should use Jackson or JavaSerializable
    akka.actor.allow-java-serialization = on
    akka.actor.warn-about-java-serializer-usage = off

    # speed up joining and such
    akka.cluster.gossip-interval = 500 ms
  """)

  def createSystem(dc: String = allDcs.head) =
    ActorSystem(
      "AuctionExampleSpec",
      clusterConfig
        .withFallback(CassandraLifecycle.config)
        //#disable-replication
        // Include persistenceMultiDcTestSettings in your ActorSystem configuration
        // to allow suspending replication through PersistenceMultiDcTestKit
        .withFallback(persistenceMultiDcTestSettings("AuctionExampleSpec", dc, allDcs)))

  //#disable-replication

}

class AuctionExampleSpec extends TestKit(AuctionExampleSpec.createSystem())
  with ImplicitSender
  with WordSpecLike
  with Matchers
  with BeforeAndAfterAll
  with BeforeAndAfter
  with CassandraLifecycleScalatest {
  import AuctionExampleSpec._

  val systemName = "AuctionExampleSpec"
  val settings = PersistenceMultiDcSettings(system)

  val otherSystem = createSystem(allDcs(1))
  val otherSettings = PersistenceMultiDcSettings(otherSystem)

  val allSystems = Seq(system, otherSystem)

  class TestSetup(testName: String) {
    val minimumBid = 12
    val auctionSetupA = AuctionSetup("bicycle", Bid("me", minimumBid, Instant.now(), ""),
      closingAt = Instant.now().plusSeconds(60), responsibleForClosing = true)
    val auctionSetupB = auctionSetupA.copy(responsibleForClosing = false)

    val nodeA = system.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupA, settings), s"auction-A-$testName")
    val nodeB = otherSystem.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupB, otherSettings), s"auction-B-$testName")

    def expectHighestBid(node: ActorRef): Bid = {
      node ! GetHighestBid
      expectMsgType[Bid]
    }
    def expectHighestBid(node: ActorRef, bidder: String, expected: MoneyAmount): Unit = {
      val bid = expectHighestBid(node)
      bid.offer shouldEqual expected
      bid.bidder shouldEqual bidder
    }
    def expectClosed(node: ActorRef): Unit = {
      node ! IsClosed
      expectMsg(true)
    }
  }

  "AuctionExample" should {
    "propagate highest bid to replicated actor" in new TestSetup("test1") {
      // simple bidding
      nodeA ! OfferBid("Mary", 42)
      expectHighestBid(nodeA, "Mary", minimumBid) // ebay style, still the minimum offer

      nodeA ! OfferBid("Paul", 41)
      expectHighestBid(nodeA, "Mary", 41) // ebay style, now updated to the highest counter offer

      awaitAssert {
        // check that results have propagated to b
        expectHighestBid(nodeB, "Mary", 41) // ebay style, now updated to the highest counter offer
      }

      // make sure that first bidder still keeps the crown
      nodeB ! OfferBid("c", 42)
      expectHighestBid(nodeB, "Mary", 42)
    }

    "eventually resolve conflicting bids during auction if bids are highest (but different) in each dc" in new TestSetup("test2") {
      // highest bid comes first
      nodeA ! OfferBid("Mary", 42)
      nodeB ! OfferBid("Paul", 41)

      awaitAssert {
        expectHighestBid(nodeA, "Mary", 41)
        expectHighestBid(nodeB, "Mary", 41)
      }

      // highest bid comes second
      nodeA ! OfferBid("Paul", 50)
      nodeB ! OfferBid("Kat", 60)

      awaitAssert {
        expectHighestBid(nodeA, "Kat", 50)
        expectHighestBid(nodeB, "Kat", 50)
      }
    }
    "eventually resolve conflicting bids during auction if bids are highest and equal (but different time) in each dc" in
      new TestSetup("test3") {
        nodeA ! OfferBid("Mary", 15)
        expectHighestBid(nodeA, "Mary", 12)

        nodeB ! OfferBid("Paul", 15)

        awaitAssert {
          expectHighestBid(nodeA, "Mary", 15)
          expectHighestBid(nodeB, "Mary", 15)
        }
      }

    "eventually come to a consistent final result" in
      new TestSetup("test4") {
        nodeA ! OfferBid("Mary", 42)
        nodeB ! OfferBid("Paul", 43)
        Thread.sleep(3000)

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeA ! Finish

        awaitAssert {
          expectClosed(nodeA)
          expectClosed(nodeB)
        }

        expectHighestBid(nodeA, "Paul", 43)
        expectHighestBid(nodeB, "Paul", 43)
      }

    "eventually come to a consistent final result when finishing is initiated on node B" in
      new TestSetup("test5") {
        nodeA ! OfferBid("Mary", 42)
        nodeB ! OfferBid("Paul", 43)
        Thread.sleep(3000)

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeB ! Finish

        awaitAssert {
          expectClosed(nodeA)
          expectClosed(nodeB)
        }

        expectHighestBid(nodeA, "Paul", 43)
        expectHighestBid(nodeB, "Paul", 43)
      }

    //#disable-replication
    "correctly resolve concurrent writes when replication is temporarily suspended" in
      new TestSetup("test6") {
        disableReplication(otherSystem, system)

        nodeA ! OfferBid("Mary", 42)
        nodeB ! OfferBid("Paul", 43)

        expectHighestBid(nodeA, "Mary", 12)

        enableReplication(otherSystem, system)

        awaitAssert {
          expectHighestBid(nodeA, "Paul", 42)
        }
      }
    //#disable-replication
  }

  after {
    allSystems.foreach { sys =>
      enableAllReplicationTo(sys)
    }
  }

  override protected def afterAll(): Unit = {
    allSystems.foreach(sys => shutdown(sys))
    super.afterAll()
  }
}
//#full-example
Java
/*
 * Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
 */
package akka.persistence.multidc.javadsl;

//#full-example
import java.io.Serializable;
import java.time.Instant;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.JAPI;
import akka.persistence.cassandra.testkit.CassandraLauncher;
import akka.persistence.multidc.PersistenceMultiDcSettings;
import akka.testkit.javadsl.TestKit;

//#cassandra-hook
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;

import akka.persistence.multidc.testkit.CassandraLifecycle;
//#disable-replication
import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit;

//#disable-replication

//#cassandra-hook
import scala.concurrent.duration.Duration;

import static org.junit.Assert.assertEquals;

//#cassandra-hook
public class AuctionExampleTest extends JUnitSuite {
//#cassandra-hook

  //#auction-commands
  static class Bid implements Serializable {
    final String bidder;
    final int offer;
    final Instant timestamp;
    final String originDc;

    Bid(String bidder, int offer, Instant timestamp, String originDc) {
      this.bidder = bidder;
      this.offer = offer;
      this.timestamp = timestamp;
      this.originDc = originDc;
    }

    Bid withOffer(int offer) {
      return new Bid(bidder, offer, timestamp, originDc);
    }
  }


  // commands
  interface AuctionCommand extends Serializable {}
  static class OfferBid implements AuctionCommand {
    final String bidder;
    final int offer;
    public OfferBid(String bidder, int offer) {
      this.bidder = bidder;
      this.offer = offer;
    }
  }
  // An auction coordinator needs to schedule this event to each replica
  static class Finish implements AuctionCommand {
    static final Finish INSTANCE = new Finish();
    private Finish() {}
  }
  static class GetHighestBid implements AuctionCommand {
    static final GetHighestBid INSTANCE = new GetHighestBid();
    private GetHighestBid() {}
  }
  static class IsClosed implements AuctionCommand {
    static final IsClosed INSTANCE = new IsClosed();
    private IsClosed() {}
  }
  // Internal, should not be sent from the outside
  private static class Close implements AuctionCommand {
    static final Close INSTANCE = new Close();
    private Close() {}
  }
  //#auction-commands

  //#auction-events
  // events
  interface AuctionEvent extends Serializable {}
  static class BidRegistered implements AuctionEvent {
    final Bid bid;
    public BidRegistered(Bid bid) {
      this.bid = bid;
    }
  }
  static class AuctionFinished implements AuctionEvent {
    final String atDc;
    public AuctionFinished(String atDc) {
      this.atDc = atDc;
    }
  }
  static class WinnerDecided implements AuctionEvent {
    final String atDc;
    final Bid winningBid;
    final int highestCounterOffer;
    public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) {
      this.atDc = atDc;
      this.winningBid = winningBid;
      this.highestCounterOffer = highestCounterOffer;
    }
  }
  //#auction-events


  //#auction-state
  static class AuctionState {

    final boolean stillRunning;
    final Bid highestBid;
    // in ebay style auctions, we need to keep track of current highest counter offer
    final int highestCounterOffer;
    final Set<String> finishedAtDc;

    AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) {
      this.stillRunning = stillRunning;
      this.highestBid = highestBid;
      this.highestCounterOffer = highestCounterOffer;
      this.finishedAtDc = finishedAtDc;
    }

    AuctionState withNewHighestBid(Bid bid) {
      assert(stillRunning);
      assert(isHigherBid(bid, highestBid));
      return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around
    }


    AuctionState withTooLowBid(Bid bid) {
      assert(stillRunning);
      assert(isHigherBid(highestBid, bid));
      return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc);
    }

    static Boolean isHigherBid(Bid first, Bid second) {
      return first.offer > second.offer ||
        (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
        // If timestamps are equal, choose by dc where the offer was submitted
        // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
        // particular DC would not be an advantage.
        (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0);
    }

    AuctionState addFinishedAtDc(String dc) {
      Set<String> s = new HashSet<>(finishedAtDc);
      s.add(dc);
      return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s));
    }

    public AuctionState close() {
      return new AuctionState(
          false,
          highestBid,
          highestCounterOffer,
          Collections.emptySet()
      );
    }

    public boolean isClosed() {
      return !stillRunning && finishedAtDc.isEmpty();
    }
  }
  //#auction-state

  private static final String FINISH_TIMER = "FinishTimer";

  //#auction-actor
  static class AuctionSetup {
    final String name;
    final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner
    final Instant closingAt;
    final boolean responsibleForClosing;
    public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) {
      this.name = name;
      this.initialBid = initialBid;
      this.closingAt = closingAt;
      this.responsibleForClosing = responsibleForClosing;
    }
  }

  static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> {

    final AuctionSetup auctionSetup;

    public AuctionEntity(AuctionSetup auctionSetup) {
      this.auctionSetup = auctionSetup;
    }

    @Override
    public AuctionState initialState() {
      return new AuctionState(
          true,
          auctionSetup.initialBid,
          auctionSetup.initialBid.offer,
          Collections.emptySet());
    }

    @Override
    public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() {
      return byStateCommandHandlerBuilder(AuctionState.class)
          .matchState(state -> state.stillRunning, running())
          .matchAny(finished());
    }

    private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() {
      return commandHandlerBuilder(AuctionCommand.class)
          .matchCommand(OfferBid.class, (ctx, state, offerBid) -> {
            return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer,
                Instant.ofEpochMilli(currentTimeMillis()), selfDc())));
          }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf());
            return Effect().none();
          }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> {
            log().info("Finish");
            return Effect().persist(new AuctionFinished(selfDc()));
          }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> {
            log().warning("Premature close");
            // Close should only be triggered when we have already finished
            return Effect().unhandled();
          }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(false, getSelf());
            return Effect().none();
          }).build();
    }

    private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() {
      return commandHandlerBuilder(AuctionCommand.class)
          .matchCommand(OfferBid.class, (ctx, state, offerBid) -> {
            // auction finished, no more bids accepted
            return Effect().unhandled();
          }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> {
            ctx.getSender().tell(state.highestBid, getSelf());
            return Effect().none();
          }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> {
            log().info("Finish");
            return Effect().persist(new AuctionFinished(selfDc()));
          }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> {
            log().info("Close");
            // TODO send email (before or after persisting)
            return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer));
          }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> {
              ctx.getSender().tell(state.isClosed(), getSelf());
              return Effect().none();
          }).build();
    }

    @Override
    public EventHandler<AuctionEvent, AuctionState> eventHandler() {
      return eventHandlerBuilder(AuctionEvent.class)
          .matchEvent(BidRegistered.class, (state, event) -> {
            if (AuctionState.isHigherBid(event.bid, state.highestBid)) {
              return state.withNewHighestBid(event.bid);
            } else {
              return state.withTooLowBid(event.bid);
            }
          }).matchEvent(AuctionFinished.class, (state, event) -> {
            if (state.isClosed())
              return state; // already closed
            else
              return state.addFinishedAtDc(event.atDc);
          }).matchEvent(WinnerDecided.class, (state, event) -> {
            return state.close();
          })
          .build();
    }

    private boolean shouldClose(AuctionState state) {
      return auctionSetup.responsibleForClosing && !state.isClosed() &&
          getAllDcs().equals(state.finishedAtDc);
    }

    private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) {
      if (shouldClose(state))
        ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf());
    }


    @Override
    public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx,
                              AuctionState state, AuctionEvent event) {
      if (event instanceof AuctionFinished && !state.isClosed()) {
        AuctionFinished finished = (AuctionFinished) event;
        log().info("AuctionFinished at {}, already finished at [{}]",
            finished.atDc, state.finishedAtDc);
        ActorContext actorCtx = ctx.actorContext();
        if (state.finishedAtDc.contains(getSelfDc())) {
          triggerCloseIfNeeded(actorCtx, state);
        } else {
          log().info("Sending finish to self");
          actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf());
        }
      }

      return Effect().none();
    }

    @Override
    public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) {
      triggerCloseIfNeeded(ctx, state);

      long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis();
      ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE,
          Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS));
      return Effect().none();
    }

  }
  //#auction-actor

  static Props auctionProps(String pid, AuctionSetup auctionSetup, PersistenceMultiDcSettings settings) {
    return ReplicatedEntity.props(
        AuctionCommand.class,
        "auction",
        pid,
        () -> new AuctionEntity(auctionSetup),
        settings);
  }


  static private class TestSetup extends TestKit {
    final String testName;
    final int minimumBid = 12;
    final AuctionSetup auctionSetupA;
    final AuctionSetup auctionSetupB;
    final ActorRef nodeA;
    final ActorRef nodeB;

    TestSetup(String testName) {
      super(system);
      this.testName = testName;
      auctionSetupA = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(),
          ""), Instant.now().plusSeconds(60), true);
      auctionSetupB = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(),
          ""), Instant.now().plusSeconds(60), false);

      PersistenceMultiDcSettings settings = PersistenceMultiDcSettings.create(system);
      PersistenceMultiDcSettings otherSettings = PersistenceMultiDcSettings.create(otherSystem);

      nodeA = system.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupA, settings), "auction-A-" + testName);
      nodeB = otherSystem.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupB, otherSettings), "auction-B-" + testName);
    }

    Bid expectHighestBid(ActorRef node) {
        node.tell(GetHighestBid.INSTANCE, getTestActor());
        return expectMsgClass(Bid.class);
    }

    void expectHighestBid(ActorRef node, String bidder, int expected) {
        Bid bid = expectHighestBid(node);
        assertEquals(expected, bid.offer);
        assertEquals(bidder, bid.bidder);
    }

    void expectClosed(ActorRef node) {
      node.tell(IsClosed.INSTANCE, getTestActor());
      expectMsg(true);
    }

  }

  private static Config clusterConfig = ConfigFactory.parseString(String.join("\n",
    "akka.actor.provider = \"cluster\"",
    "akka.remote.netty.tcp.port = 0",
    "akka.remote.classic.netty.tcp.port = 0",
    "akka.remote.artery.canonical.port = 0",
    "akka.remote.artery.canonical.hostname = 127.0.0.1",
    "akka.cluster.jmx.multi-mbeans-in-same-jvm = on",
    // avoid eager init in tests
    "akka.persistence.multi-data-center.hot-standby.enabled = off",
    // FIXME when using Akka 2.6 we should use Jackson or JavaSerializable
    "akka.actor.allow-java-serialization = on",
    "akka.actor.warn-about-java-serializer-usage = off",
    // speed up joining and such
    "akka.cluster.gossip-interval = 500 ms"));

  private static ActorSystem createSystem(String dc) {
    return ActorSystem.create(
      "auctionTest",
      clusterConfig
        .withFallback(CassandraLifecycle.config())
        //#disable-replication
        // Include persistenceMultiDcTestSettings in your ActorSystem configuration
        // to allow suspending replication through PersistenceMultiDcTestKit
        .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B"))));

        //#disable-replication
  }

  //#cassandra-hook
  private static ActorSystem system;
  private static ActorSystem otherSystem;

  @BeforeClass
  public static void setupSystems() {
    system = createSystem("DC-A");
    CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource());
    CassandraLifecycle.awaitPersistenceInit(system);

    otherSystem = createSystem("DC-B");
    CassandraLifecycle.awaitPersistenceInit(otherSystem);
  }

  @AfterClass
  public static void tearDownSystems() {
    TestKit.shutdownActorSystem(system, true);
    TestKit.shutdownActorSystem(otherSystem, true);
    CassandraLifecycle.stopCassandra();
  }
  //#cassandra-hook

  @Test
  public void propagateHighestBidToReplicatedActor() {
    new TestSetup("test-1") {
      {
        // simple bidding
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        expectHighestBid(nodeA, "Mary", minimumBid); // ebay style, still the minimum offer

        nodeA.tell(new OfferBid("Paul", 41), ActorRef.noSender());
        expectHighestBid(nodeA, "Mary", 41); // ebay style, now updated to the highest counter offer

        awaitAssert(
            () -> {
              expectHighestBid(nodeB, "Mary", 41);
              return null;
            });

        awaitAssert(
            () -> {
              // check that results have propagated to b
              expectHighestBid(nodeB, "Mary", 41); // ebay style, now updated to the highest counter offer
              return null;
            }
        );

        // make sure that first bidder still keeps the crown
        nodeB.tell(new OfferBid("c", 42), ActorRef.noSender());
        expectHighestBid(nodeB, "Mary", 42);
      }
    };
  }


  @Test
  public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestButDifferentInEachDc() {
    new TestSetup("test-2") {
      {
        // highest bid comes first
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 41), ActorRef.noSender());

        awaitAssert(() -> {
          expectHighestBid(nodeA, "Mary", 41);
          expectHighestBid(nodeB, "Mary", 41);
          return null;
        });


        // highest bid comes second
        nodeA.tell(new OfferBid("Paul", 50), ActorRef.noSender());
        nodeB.tell(new OfferBid("Kat", 60), ActorRef.noSender());

        awaitAssert(() -> {
          expectHighestBid(nodeA, "Kat", 50);
          expectHighestBid(nodeB, "Kat", 50);
          return null;
        });
      }
    };
  }

  @Test
  public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestAndEqualButDifferentTimeInEachDc() {
    new TestSetup("test3") {
      {
        nodeA.tell(new OfferBid("Mary", 15), ActorRef.noSender());
        expectHighestBid(nodeA, "Mary", 12);

        nodeB.tell(new OfferBid("Paul", 15), ActorRef.noSender());

        awaitAssert(() -> {
            expectHighestBid(nodeA, "Mary", 15);
            expectHighestBid(nodeB, "Mary", 15);
            return null;
        });
      }
    };
  }

  @Test
  public void eventuallyComeToAConsistentFinalResult() throws Exception {
    new TestSetup("test4") {
      {
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());
        Thread.sleep(3000);

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeA.tell(Finish.INSTANCE, ActorRef.noSender());

        awaitAssert(() -> {
          expectClosed(nodeA);
          expectClosed(nodeB);
          return null;
        });

        expectHighestBid(nodeA, "Paul", 43);
        expectHighestBid(nodeB, "Paul", 43);
      }
    };
  }

  @Test
  public void eventuallyComeToAConsistentFinalResultWhenFinishingIsInitiatedOnNodeB() throws Exception {
    new TestSetup("test5") {
      {
        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());
        Thread.sleep(3000);

        // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test
        nodeB.tell(Finish.INSTANCE, ActorRef.noSender());

        awaitAssert(() -> {
          expectClosed(nodeA);
          expectClosed(nodeB);
          return null;
        });

        expectHighestBid(nodeA, "Paul", 43);
        expectHighestBid(nodeB, "Paul", 43);
      }
    };
  }

  //#disable-replication
  @Test
  public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception {
    new TestSetup("test6") {
      {
        PersistenceMultiDcTestKit.disableReplication(otherSystem, system);

        nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender());
        nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender());

        expectHighestBid(nodeA, "Mary", 12);

        PersistenceMultiDcTestKit.enableReplication(otherSystem, system);

        awaitAssert(() -> {
          expectHighestBid(nodeA, "Paul", 42);
          return null;
        });

      }
    };
  }
  //#disable-replication
//#cassandra-hook
}
//#cassandra-hook
//#full-example