Auction Example
Having to use CRDTs for replication might seem like a severe constraint because the set of predefined CRDTs is quite limited. In this example we want to show that real-world applications can be implemented by designing events in a way that they don’t conflict. In the end, you will end up with a solution based on a custom CRDT.
We are building a small auction service. An auction is represented by one replicated actor. It should have the following operations:
- Place a bid
- Get the currently highest bid
- Finish the auction
We model those operations as commands to be sent to the auction actor:
- Scala
-
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String) // commands sealed trait AuctionCommand case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica case object GetHighestBid extends AuctionCommand case object IsClosed extends AuctionCommand private case object Close extends AuctionCommand // Internal, should not be sent from the outside
- Java
-
static class Bid implements Serializable { final String bidder; final int offer; final Instant timestamp; final String originDc; Bid(String bidder, int offer, Instant timestamp, String originDc) { this.bidder = bidder; this.offer = offer; this.timestamp = timestamp; this.originDc = originDc; } Bid withOffer(int offer) { return new Bid(bidder, offer, timestamp, originDc); } } // commands interface AuctionCommand extends Serializable {} static class OfferBid implements AuctionCommand { final String bidder; final int offer; public OfferBid(String bidder, int offer) { this.bidder = bidder; this.offer = offer; } } // An auction coordinator needs to schedule this event to each replica static class Finish implements AuctionCommand { static final Finish INSTANCE = new Finish(); private Finish() {} } static class GetHighestBid implements AuctionCommand { static final GetHighestBid INSTANCE = new GetHighestBid(); private GetHighestBid() {} } static class IsClosed implements AuctionCommand { static final IsClosed INSTANCE = new IsClosed(); private IsClosed() {} } // Internal, should not be sent from the outside private static class Close implements AuctionCommand { static final Close INSTANCE = new Close(); private Close() {} }
The auction entity is an event-sourced persistent actor. These events are used to persist state changes:
- Scala
-
// events sealed trait AuctionEvent case class BidRegistered(bid: Bid) extends AuctionEvent case class AuctionFinished(atDc: String) extends AuctionEvent case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent
- Java
-
// events interface AuctionEvent extends Serializable {} static class BidRegistered implements AuctionEvent { final Bid bid; public BidRegistered(Bid bid) { this.bid = bid; } } static class AuctionFinished implements AuctionEvent { final String atDc; public AuctionFinished(String atDc) { this.atDc = atDc; } } static class WinnerDecided implements AuctionEvent { final String atDc; final Bid winningBid; final int highestCounterOffer; public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) { this.atDc = atDc; this.winningBid = winningBid; this.highestCounterOffer = highestCounterOffer; } }
You may have noticed here, that we include the highestCounterOffer
in the AuctionFinished
event. This is because we use a popular auction style where the winner does not have to pay the highest bidden price but only just enough to beat the second highest bid.
Let’s have a look at the auction entity that will handle incoming commands:
- Scala
-
case class AuctionSetup( name: String, initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner closingAt: Instant, responsibleForClosing: Boolean) class AuctionEntity(auctionSetup: AuctionSetup) extends ReplicatedEntity[AuctionCommand, AuctionEvent, AuctionState] { override def initialState: AuctionState = AuctionState( phase = Running, highestBid = auctionSetup.initialBid, highestCounterOffer = auctionSetup.initialBid.offer) override def commandHandler: CommandHandler = { CommandHandler.byState { case AuctionState(Running, _, _) => running case _ => finished } } private val running: CommandHandler = { CommandHandler { case (_, _, OfferBid(bidder, offer)) => Effect.persist(BidRegistered(Bid(bidder, offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc))) case (ctx, state, GetHighestBid) => ctx.sender() ! state.highestBid.copy(offer = state.highestCounterOffer) Effect.none case (_, _, Finish) => log.info("Finish") Effect.persist(AuctionFinished(selfDc)) case (_, _, Close) => log.warning("Premature close") // Close should only be triggered when we have already finished Effect.unhandled case (ctx, _, IsClosed) => ctx.sender() ! false Effect.none } } private val finished: CommandHandler = { CommandHandler { case (ctx, state, GetHighestBid) => ctx.sender() ! state.highestBid Effect.none case (ctx, state, IsClosed) => ctx.sender() ! (state.phase == Closed) Effect.none case (ctx, _, Finish) => log.info("Finish") Effect.persist(AuctionFinished(selfDc)) case (_, state, Close) => log.info("Close") require(shouldClose(state)) // TODO send email (before or after persisting) Effect.persist(WinnerDecided(selfDc, state.highestBid, state.highestCounterOffer)) case (_, _, _: OfferBid) => // auction finished, no more bids accepted Effect.unhandled } } override def eventHandler(state: AuctionState, event: AuctionEvent): AuctionState = state.applyEvent(event) private def shouldClose(state: AuctionState): Boolean = { auctionSetup.responsibleForClosing && (state.phase match { case Closing(alreadyFinishedAtDc) => allDcs.diff(alreadyFinishedAtDc).isEmpty case _ => false }) } private def triggerCloseIfNeeded(ctx: ActorContext, state: AuctionState): Unit = { if (shouldClose(state)) ctx.self ! Close } override def eventTrigger( ctx: EventTriggerContext, state: AuctionState, event: AuctionEvent): Effect[AuctionEvent, AuctionState] = { event match { case finished: AuctionFinished => state.phase match { case Closing(alreadyFinishedAtDc) => log.info("AuctionFinished at {}, already finished at [{}]", finished.atDc, alreadyFinishedAtDc.mkString(", ")) if (alreadyFinishedAtDc(selfDc)) { triggerCloseIfNeeded(ctx.actorContext, state) } else { log.info("Sending finish to self") ctx.actorContext.self ! Finish } case _ => // no trigger for this state } case _ => // no trigger for this event } Effect.none } override def recoveryCompleted(ctx: ActorContext, state: AuctionState): Effect[AuctionEvent, AuctionState] = { triggerCloseIfNeeded(ctx, state) val millisUntilClosing = auctionSetup.closingAt.toEpochMilli - currentTimeMillis() ctx.timers.startSingleTimer(FinishTimer, Finish, millisUntilClosing.millis) Effect.none } }
- Java
-
static class AuctionSetup { final String name; final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner final Instant closingAt; final boolean responsibleForClosing; public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) { this.name = name; this.initialBid = initialBid; this.closingAt = closingAt; this.responsibleForClosing = responsibleForClosing; } } static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> { final AuctionSetup auctionSetup; public AuctionEntity(AuctionSetup auctionSetup) { this.auctionSetup = auctionSetup; } @Override public AuctionState initialState() { return new AuctionState( true, auctionSetup.initialBid, auctionSetup.initialBid.offer, Collections.emptySet()); } @Override public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() { return byStateCommandHandlerBuilder(AuctionState.class) .matchState(state -> state.stillRunning, running()) .matchAny(finished()); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc()))); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().warning("Premature close"); // Close should only be triggered when we have already finished return Effect().unhandled(); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(false, getSelf()); return Effect().none(); }).build(); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { // auction finished, no more bids accepted return Effect().unhandled(); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid, getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().info("Close"); // TODO send email (before or after persisting) return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer)); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.isClosed(), getSelf()); return Effect().none(); }).build(); } @Override public EventHandler<AuctionEvent, AuctionState> eventHandler() { return eventHandlerBuilder(AuctionEvent.class) .matchEvent(BidRegistered.class, (state, event) -> { if (AuctionState.isHigherBid(event.bid, state.highestBid)) { return state.withNewHighestBid(event.bid); } else { return state.withTooLowBid(event.bid); } }).matchEvent(AuctionFinished.class, (state, event) -> { if (state.isClosed()) return state; // already closed else return state.addFinishedAtDc(event.atDc); }).matchEvent(WinnerDecided.class, (state, event) -> { return state.close(); }) .build(); } private boolean shouldClose(AuctionState state) { return auctionSetup.responsibleForClosing && !state.isClosed() && getAllDcs().equals(state.finishedAtDc); } private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) { if (shouldClose(state)) ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf()); } @Override public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx, AuctionState state, AuctionEvent event) { if (event instanceof AuctionFinished && !state.isClosed()) { AuctionFinished finished = (AuctionFinished) event; log().info("AuctionFinished at {}, already finished at [{}]", finished.atDc, state.finishedAtDc); ActorContext actorCtx = ctx.actorContext(); if (state.finishedAtDc.contains(getSelfDc())) { triggerCloseIfNeeded(actorCtx, state); } else { log().info("Sending finish to self"); actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf()); } } return Effect().none(); } @Override public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) { triggerCloseIfNeeded(ctx, state); long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis(); ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE, Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS)); return Effect().none(); } }
The auction entity is started with the initial parameters for the auction. As seen before, replicated entities need to be parameterized with the types for commands and events and also for the internal state.
In the initialState
method, a replicated entity needs to define its original state. In our case, it’s straightforward to initialize the initial state from our initialization parameters as given in the AuctionSetup
instance. The minimum bid is in our case modelled as an initialBid
.
The actions
defines how to react to external commands. In our case, for OfferBid
and AuctionFinished
we do nothing more than to emit events corresponding to the command. For GetHighestBid
we respond with details from the state. Note, that we overwrite the actual offer of the highest bid here with the amount of the highestCounterOffer
. This is done to follow the popular auction style where the actual highest bid is never publicly revealed.
Let’s have a look at our state class, AuctionState
which also represents the CRDT in our example.
- Scala
-
private val FinishTimer = "FinishTimer" case class AuctionState( phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer ) { def applyEvent(event: AuctionEvent): AuctionState = event match { case BidRegistered(b) => if (isHigherBid(b, highestBid)) withNewHighestBid(b) else withTooLowBid(b) case AuctionFinished(atDc) => phase match { case Running => copy(phase = Closing(Set(atDc))) case Closing(alreadyFinishedDcs) => copy(phase = Closing(alreadyFinishedDcs + atDc)) case _ => this } case _: WinnerDecided => copy(phase = Closed) } def withNewHighestBid(bid: Bid): AuctionState = { require(phase != Closed) require(isHigherBid(bid, highestBid)) copy( highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around ) } def withTooLowBid(bid: Bid): AuctionState = { require(phase != Closed) require(isHigherBid(highestBid, bid)) copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer } def isHigherBid(first: Bid, second: Bid): Boolean = first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0) } - Java
-
static class AuctionState { final boolean stillRunning; final Bid highestBid; // in ebay style auctions, we need to keep track of current highest counter offer final int highestCounterOffer; final Set<String> finishedAtDc; AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) { this.stillRunning = stillRunning; this.highestBid = highestBid; this.highestCounterOffer = highestCounterOffer; this.finishedAtDc = finishedAtDc; } AuctionState withNewHighestBid(Bid bid) { assert(stillRunning); assert(isHigherBid(bid, highestBid)); return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around } AuctionState withTooLowBid(Bid bid) { assert(stillRunning); assert(isHigherBid(highestBid, bid)); return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc); } static Boolean isHigherBid(Bid first, Bid second) { return first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0); } AuctionState addFinishedAtDc(String dc) { Set<String> s = new HashSet<>(finishedAtDc); s.add(dc); return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s)); } public AuctionState close() { return new AuctionState( false, highestBid, highestCounterOffer, Collections.emptySet() ); } public boolean isClosed() { return !stillRunning && finishedAtDc.isEmpty(); } }
The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid, and the highest counter offer so far.
In the eventHandler
, we handle persisted events to drive the state change. When a new bid is registered,
- it needs to be decided whether the new bid is the winning bid or not
- the state needs to be updated accordingly
The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed. We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally highest regardless of the order in which the events come in.
The ordering between bids is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state outside of our state class so that all replicas come to the same result. We define the ordering as this:
- A higher bid wins.
- If there’s a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the (local) timestamp the bid was registered.
- We need to make sure that no timestamp is used twice in the same DC (missing in this example).
- If there’s a tie between the timestamp, we define an arbitrary but deterministic ordering on the DCs, in our case we just compare the name strings of the DCs. That’s why we need to keep the identifier of the DC where a bid was registered for every
Bid
.
If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the highestCounterOffer
. If the new bid was lower, we just update the highestCounterOffer
if necessary.
Using those rules, the order of incoming does not matter. Replicas in all DCs should eventually converge to the same result.
Open questions
The auction example shows basic features of an auction. There are a few additional considerations
- Replica only eventually converge to the same result. That might lead to surprising results because highest bids from other replicas than the local one might only turn up with a delay. Another surprising result might be that two bids with the same amount issued each to different replicas in quick succession might be ordered differently due clock differences between replicas. In a real bidding system, it needs to be made sure that no replica has a competitive advantage over another one.
Complete example source code
For reference here’s the complete example, including imports and tests:
- Scala
-
/** * Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com> */ package akka.persistence.multidc.scaladsl //#full-example import java.time.Instant import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.{ ActorRef, ActorSystem, Props } import akka.persistence.multidc.PersistenceMultiDcSettings import org.scalatest.BeforeAndAfterAll import org.scalatest.Matchers import org.scalatest.WordSpecLike import org.scalatest.BeforeAndAfter import akka.persistence.multidc.testkit._ //#disable-replication import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit._ //#disable-replication import akka.testkit.ImplicitSender import akka.testkit.TestKit object AuctionExampleSpec { //#full-example type MoneyAmount = Int //#auction-commands case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: String) // commands sealed trait AuctionCommand case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica case object GetHighestBid extends AuctionCommand case object IsClosed extends AuctionCommand private case object Close extends AuctionCommand // Internal, should not be sent from the outside //#auction-commands //#auction-events // events sealed trait AuctionEvent case class BidRegistered(bid: Bid) extends AuctionEvent case class AuctionFinished(atDc: String) extends AuctionEvent case class WinnerDecided(atDc: String, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent //#auction-events def auctionProps(pid: String, auctionSetup: AuctionSetup, settings: PersistenceMultiDcSettings): Props = ReplicatedEntity.props("auction", pid, () => new AuctionEntity(auctionSetup), settings) /** * The auction passes through several workflow phases. * First, in `Running` `OfferBid` commands are accepted. * * `AuctionEntity` instances in all DCs schedule a `Finish` command * at a given time. That persists the `AuctionFinished` event and the * phase is in `Closing` until the auction is finished in all DCs. * * When the auction has been finished no more `OfferBid` commands are accepted. * * The auction is also finished immediately if `AuctionFinished` event from another * DC is seen before the scheduled `Finish` command. In that way the auction is finished * as quickly as possible in all DCs even though there might be some clock skew. * * One DC is responsible for finally deciding the winner and publishing the result. * All events must be collected from all DC before that can happen. * When the responsible DC has seen all `AuctionFinished` events from other DCs * all other events have also been propagated and it can persist `WinnerDecided` and * the auction is finally `Closed`. * */ sealed trait AuctionPhase case object Running extends AuctionPhase case class Closing(finishedAtDc: Set[String]) extends AuctionPhase case object Closed extends AuctionPhase //#auction-state private val FinishTimer = "FinishTimer" case class AuctionState( phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer ) { def applyEvent(event: AuctionEvent): AuctionState = event match { case BidRegistered(b) => if (isHigherBid(b, highestBid)) withNewHighestBid(b) else withTooLowBid(b) case AuctionFinished(atDc) => phase match { case Running => copy(phase = Closing(Set(atDc))) case Closing(alreadyFinishedDcs) => copy(phase = Closing(alreadyFinishedDcs + atDc)) case _ => this } case _: WinnerDecided => copy(phase = Closed) } def withNewHighestBid(bid: Bid): AuctionState = { require(phase != Closed) require(isHigherBid(bid, highestBid)) copy( highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around ) } def withTooLowBid(bid: Bid): AuctionState = { require(phase != Closed) require(isHigherBid(highestBid, bid)) copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer } def isHigherBid(first: Bid, second: Bid): Boolean = first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0) } //#auction-state //#auction-actor case class AuctionSetup( name: String, initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner closingAt: Instant, responsibleForClosing: Boolean) class AuctionEntity(auctionSetup: AuctionSetup) extends ReplicatedEntity[AuctionCommand, AuctionEvent, AuctionState] { override def initialState: AuctionState = AuctionState( phase = Running, highestBid = auctionSetup.initialBid, highestCounterOffer = auctionSetup.initialBid.offer) override def commandHandler: CommandHandler = { CommandHandler.byState { case AuctionState(Running, _, _) => running case _ => finished } } private val running: CommandHandler = { CommandHandler { case (_, _, OfferBid(bidder, offer)) => Effect.persist(BidRegistered(Bid(bidder, offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc))) case (ctx, state, GetHighestBid) => ctx.sender() ! state.highestBid.copy(offer = state.highestCounterOffer) Effect.none case (_, _, Finish) => log.info("Finish") Effect.persist(AuctionFinished(selfDc)) case (_, _, Close) => log.warning("Premature close") // Close should only be triggered when we have already finished Effect.unhandled case (ctx, _, IsClosed) => ctx.sender() ! false Effect.none } } private val finished: CommandHandler = { CommandHandler { case (ctx, state, GetHighestBid) => ctx.sender() ! state.highestBid Effect.none case (ctx, state, IsClosed) => ctx.sender() ! (state.phase == Closed) Effect.none case (ctx, _, Finish) => log.info("Finish") Effect.persist(AuctionFinished(selfDc)) case (_, state, Close) => log.info("Close") require(shouldClose(state)) // TODO send email (before or after persisting) Effect.persist(WinnerDecided(selfDc, state.highestBid, state.highestCounterOffer)) case (_, _, _: OfferBid) => // auction finished, no more bids accepted Effect.unhandled } } override def eventHandler(state: AuctionState, event: AuctionEvent): AuctionState = state.applyEvent(event) private def shouldClose(state: AuctionState): Boolean = { auctionSetup.responsibleForClosing && (state.phase match { case Closing(alreadyFinishedAtDc) => allDcs.diff(alreadyFinishedAtDc).isEmpty case _ => false }) } private def triggerCloseIfNeeded(ctx: ActorContext, state: AuctionState): Unit = { if (shouldClose(state)) ctx.self ! Close } override def eventTrigger( ctx: EventTriggerContext, state: AuctionState, event: AuctionEvent): Effect[AuctionEvent, AuctionState] = { event match { case finished: AuctionFinished => state.phase match { case Closing(alreadyFinishedAtDc) => log.info("AuctionFinished at {}, already finished at [{}]", finished.atDc, alreadyFinishedAtDc.mkString(", ")) if (alreadyFinishedAtDc(selfDc)) { triggerCloseIfNeeded(ctx.actorContext, state) } else { log.info("Sending finish to self") ctx.actorContext.self ! Finish } case _ => // no trigger for this state } case _ => // no trigger for this event } Effect.none } override def recoveryCompleted(ctx: ActorContext, state: AuctionState): Effect[AuctionEvent, AuctionState] = { triggerCloseIfNeeded(ctx, state) val millisUntilClosing = auctionSetup.closingAt.toEpochMilli - currentTimeMillis() ctx.timers.startSingleTimer(FinishTimer, Finish, millisUntilClosing.millis) Effect.none } } //#auction-actor //#full-example val allDcs = Seq("DC-A", "DC-B") val clusterConfig = ConfigFactory.parseString(""" akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 akka.remote.classic.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 akka.cluster.jmx.multi-mbeans-in-same-jvm = on # avoid eager init in tests akka.persistence.multi-data-center.hot-standby.enabled = off # FIXME when using Akka 2.6 we should use Jackson or JavaSerializable akka.actor.allow-java-serialization = on akka.actor.warn-about-java-serializer-usage = off # speed up joining and such akka.cluster.gossip-interval = 500 ms """) def createSystem(dc: String = allDcs.head) = ActorSystem( "AuctionExampleSpec", clusterConfig .withFallback(CassandraLifecycle.config) //#disable-replication // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(persistenceMultiDcTestSettings("AuctionExampleSpec", dc, allDcs))) //#disable-replication } class AuctionExampleSpec extends TestKit(AuctionExampleSpec.createSystem()) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfter with CassandraLifecycleScalatest { import AuctionExampleSpec._ val systemName = "AuctionExampleSpec" val settings = PersistenceMultiDcSettings(system) val otherSystem = createSystem(allDcs(1)) val otherSettings = PersistenceMultiDcSettings(otherSystem) val allSystems = Seq(system, otherSystem) class TestSetup(testName: String) { val minimumBid = 12 val auctionSetupA = AuctionSetup("bicycle", Bid("me", minimumBid, Instant.now(), ""), closingAt = Instant.now().plusSeconds(60), responsibleForClosing = true) val auctionSetupB = auctionSetupA.copy(responsibleForClosing = false) val nodeA = system.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupA, settings), s"auction-A-$testName") val nodeB = otherSystem.actorOf(auctionProps(s"bikeAuction-$testName", auctionSetupB, otherSettings), s"auction-B-$testName") def expectHighestBid(node: ActorRef): Bid = { node ! GetHighestBid expectMsgType[Bid] } def expectHighestBid(node: ActorRef, bidder: String, expected: MoneyAmount): Unit = { val bid = expectHighestBid(node) bid.offer shouldEqual expected bid.bidder shouldEqual bidder } def expectClosed(node: ActorRef): Unit = { node ! IsClosed expectMsg(true) } } "AuctionExample" should { "propagate highest bid to replicated actor" in new TestSetup("test1") { // simple bidding nodeA ! OfferBid("Mary", 42) expectHighestBid(nodeA, "Mary", minimumBid) // ebay style, still the minimum offer nodeA ! OfferBid("Paul", 41) expectHighestBid(nodeA, "Mary", 41) // ebay style, now updated to the highest counter offer awaitAssert { // check that results have propagated to b expectHighestBid(nodeB, "Mary", 41) // ebay style, now updated to the highest counter offer } // make sure that first bidder still keeps the crown nodeB ! OfferBid("c", 42) expectHighestBid(nodeB, "Mary", 42) } "eventually resolve conflicting bids during auction if bids are highest (but different) in each dc" in new TestSetup("test2") { // highest bid comes first nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 41) awaitAssert { expectHighestBid(nodeA, "Mary", 41) expectHighestBid(nodeB, "Mary", 41) } // highest bid comes second nodeA ! OfferBid("Paul", 50) nodeB ! OfferBid("Kat", 60) awaitAssert { expectHighestBid(nodeA, "Kat", 50) expectHighestBid(nodeB, "Kat", 50) } } "eventually resolve conflicting bids during auction if bids are highest and equal (but different time) in each dc" in new TestSetup("test3") { nodeA ! OfferBid("Mary", 15) expectHighestBid(nodeA, "Mary", 12) nodeB ! OfferBid("Paul", 15) awaitAssert { expectHighestBid(nodeA, "Mary", 15) expectHighestBid(nodeB, "Mary", 15) } } "eventually come to a consistent final result" in new TestSetup("test4") { nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) Thread.sleep(3000) // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeA ! Finish awaitAssert { expectClosed(nodeA) expectClosed(nodeB) } expectHighestBid(nodeA, "Paul", 43) expectHighestBid(nodeB, "Paul", 43) } "eventually come to a consistent final result when finishing is initiated on node B" in new TestSetup("test5") { nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) Thread.sleep(3000) // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeB ! Finish awaitAssert { expectClosed(nodeA) expectClosed(nodeB) } expectHighestBid(nodeA, "Paul", 43) expectHighestBid(nodeB, "Paul", 43) } //#disable-replication "correctly resolve concurrent writes when replication is temporarily suspended" in new TestSetup("test6") { disableReplication(otherSystem, system) nodeA ! OfferBid("Mary", 42) nodeB ! OfferBid("Paul", 43) expectHighestBid(nodeA, "Mary", 12) enableReplication(otherSystem, system) awaitAssert { expectHighestBid(nodeA, "Paul", 42) } } //#disable-replication } after { allSystems.foreach { sys => enableAllReplicationTo(sys) } } override protected def afterAll(): Unit = { allSystems.foreach(sys => shutdown(sys)) super.afterAll() } } //#full-example
- Java
-
/* * Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com> */ package akka.persistence.multidc.javadsl; //#full-example import java.io.Serializable; import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.JAPI; import akka.persistence.cassandra.testkit.CassandraLauncher; import akka.persistence.multidc.PersistenceMultiDcSettings; import akka.testkit.javadsl.TestKit; //#cassandra-hook import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.scalatest.junit.JUnitSuite; import akka.persistence.multidc.testkit.CassandraLifecycle; //#disable-replication import akka.persistence.multidc.testkit.PersistenceMultiDcTestKit; //#disable-replication //#cassandra-hook import scala.concurrent.duration.Duration; import static org.junit.Assert.assertEquals; //#cassandra-hook public class AuctionExampleTest extends JUnitSuite { //#cassandra-hook //#auction-commands static class Bid implements Serializable { final String bidder; final int offer; final Instant timestamp; final String originDc; Bid(String bidder, int offer, Instant timestamp, String originDc) { this.bidder = bidder; this.offer = offer; this.timestamp = timestamp; this.originDc = originDc; } Bid withOffer(int offer) { return new Bid(bidder, offer, timestamp, originDc); } } // commands interface AuctionCommand extends Serializable {} static class OfferBid implements AuctionCommand { final String bidder; final int offer; public OfferBid(String bidder, int offer) { this.bidder = bidder; this.offer = offer; } } // An auction coordinator needs to schedule this event to each replica static class Finish implements AuctionCommand { static final Finish INSTANCE = new Finish(); private Finish() {} } static class GetHighestBid implements AuctionCommand { static final GetHighestBid INSTANCE = new GetHighestBid(); private GetHighestBid() {} } static class IsClosed implements AuctionCommand { static final IsClosed INSTANCE = new IsClosed(); private IsClosed() {} } // Internal, should not be sent from the outside private static class Close implements AuctionCommand { static final Close INSTANCE = new Close(); private Close() {} } //#auction-commands //#auction-events // events interface AuctionEvent extends Serializable {} static class BidRegistered implements AuctionEvent { final Bid bid; public BidRegistered(Bid bid) { this.bid = bid; } } static class AuctionFinished implements AuctionEvent { final String atDc; public AuctionFinished(String atDc) { this.atDc = atDc; } } static class WinnerDecided implements AuctionEvent { final String atDc; final Bid winningBid; final int highestCounterOffer; public WinnerDecided(String atDc, Bid winningBid, int highestCounterOffer) { this.atDc = atDc; this.winningBid = winningBid; this.highestCounterOffer = highestCounterOffer; } } //#auction-events //#auction-state static class AuctionState { final boolean stillRunning; final Bid highestBid; // in ebay style auctions, we need to keep track of current highest counter offer final int highestCounterOffer; final Set<String> finishedAtDc; AuctionState(boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) { this.stillRunning = stillRunning; this.highestBid = highestBid; this.highestCounterOffer = highestCounterOffer; this.finishedAtDc = finishedAtDc; } AuctionState withNewHighestBid(Bid bid) { assert(stillRunning); assert(isHigherBid(bid, highestBid)); return new AuctionState(stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around } AuctionState withTooLowBid(Bid bid) { assert(stillRunning); assert(isHigherBid(highestBid, bid)); return new AuctionState(stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc); } static Boolean isHigherBid(Bid first, Bid second) { return first.offer > second.offer || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.compareTo(second.originDc) < 0); } AuctionState addFinishedAtDc(String dc) { Set<String> s = new HashSet<>(finishedAtDc); s.add(dc); return new AuctionState(false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s)); } public AuctionState close() { return new AuctionState( false, highestBid, highestCounterOffer, Collections.emptySet() ); } public boolean isClosed() { return !stillRunning && finishedAtDc.isEmpty(); } } //#auction-state private static final String FINISH_TIMER = "FinishTimer"; //#auction-actor static class AuctionSetup { final String name; final Bid initialBid; // the initial bid is basically the minimum price bidden at start time by the owner final Instant closingAt; final boolean responsibleForClosing; public AuctionSetup(String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) { this.name = name; this.initialBid = initialBid; this.closingAt = closingAt; this.responsibleForClosing = responsibleForClosing; } } static class AuctionEntity extends ReplicatedEntity<AuctionCommand, AuctionEvent, AuctionState> { final AuctionSetup auctionSetup; public AuctionEntity(AuctionSetup auctionSetup) { this.auctionSetup = auctionSetup; } @Override public AuctionState initialState() { return new AuctionState( true, auctionSetup.initialBid, auctionSetup.initialBid.offer, Collections.emptySet()); } @Override public CommandHandler<AuctionCommand, AuctionEvent, AuctionState> commandHandler() { return byStateCommandHandlerBuilder(AuctionState.class) .matchState(state -> state.stillRunning, running()) .matchAny(finished()); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> running() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { return Effect().persist(new BidRegistered(new Bid(offerBid.bidder, offerBid.offer, Instant.ofEpochMilli(currentTimeMillis()), selfDc()))); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid.withOffer(state.highestCounterOffer), getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().warning("Premature close"); // Close should only be triggered when we have already finished return Effect().unhandled(); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(false, getSelf()); return Effect().none(); }).build(); } private CommandHandler<AuctionCommand, AuctionEvent, AuctionState> finished() { return commandHandlerBuilder(AuctionCommand.class) .matchCommand(OfferBid.class, (ctx, state, offerBid) -> { // auction finished, no more bids accepted return Effect().unhandled(); }).matchExactCommand(GetHighestBid.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.highestBid, getSelf()); return Effect().none(); }).matchExactCommand(Finish.INSTANCE, (ctx, state, finish) -> { log().info("Finish"); return Effect().persist(new AuctionFinished(selfDc())); }).matchExactCommand(Close.INSTANCE, (ctx, state, close) -> { log().info("Close"); // TODO send email (before or after persisting) return Effect().persist(new WinnerDecided(selfDc(), state.highestBid, state.highestCounterOffer)); }).matchExactCommand(IsClosed.INSTANCE, (ctx, state, query) -> { ctx.getSender().tell(state.isClosed(), getSelf()); return Effect().none(); }).build(); } @Override public EventHandler<AuctionEvent, AuctionState> eventHandler() { return eventHandlerBuilder(AuctionEvent.class) .matchEvent(BidRegistered.class, (state, event) -> { if (AuctionState.isHigherBid(event.bid, state.highestBid)) { return state.withNewHighestBid(event.bid); } else { return state.withTooLowBid(event.bid); } }).matchEvent(AuctionFinished.class, (state, event) -> { if (state.isClosed()) return state; // already closed else return state.addFinishedAtDc(event.atDc); }).matchEvent(WinnerDecided.class, (state, event) -> { return state.close(); }) .build(); } private boolean shouldClose(AuctionState state) { return auctionSetup.responsibleForClosing && !state.isClosed() && getAllDcs().equals(state.finishedAtDc); } private void triggerCloseIfNeeded(ActorContext ctx, AuctionState state) { if (shouldClose(state)) ctx.getSelf().tell(Close.INSTANCE, ctx.getSelf()); } @Override public Effect<AuctionEvent, AuctionState> eventTrigger(EventTriggerContext ctx, AuctionState state, AuctionEvent event) { if (event instanceof AuctionFinished && !state.isClosed()) { AuctionFinished finished = (AuctionFinished) event; log().info("AuctionFinished at {}, already finished at [{}]", finished.atDc, state.finishedAtDc); ActorContext actorCtx = ctx.actorContext(); if (state.finishedAtDc.contains(getSelfDc())) { triggerCloseIfNeeded(actorCtx, state); } else { log().info("Sending finish to self"); actorCtx.getSelf().tell(Finish.INSTANCE, actorCtx.getSelf()); } } return Effect().none(); } @Override public Effect<AuctionEvent, AuctionState> recoveryCompleted(ActorContext ctx, AuctionState state) { triggerCloseIfNeeded(ctx, state); long millisUntilClosing = auctionSetup.closingAt.toEpochMilli() - currentTimeMillis(); ctx.getTimers().startSingleTimer(FINISH_TIMER, Finish.INSTANCE, Duration.create(millisUntilClosing, TimeUnit.MILLISECONDS)); return Effect().none(); } } //#auction-actor static Props auctionProps(String pid, AuctionSetup auctionSetup, PersistenceMultiDcSettings settings) { return ReplicatedEntity.props( AuctionCommand.class, "auction", pid, () -> new AuctionEntity(auctionSetup), settings); } static private class TestSetup extends TestKit { final String testName; final int minimumBid = 12; final AuctionSetup auctionSetupA; final AuctionSetup auctionSetupB; final ActorRef nodeA; final ActorRef nodeB; TestSetup(String testName) { super(system); this.testName = testName; auctionSetupA = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(), ""), Instant.now().plusSeconds(60), true); auctionSetupB = new AuctionSetup("bicycle", new Bid("me", minimumBid, Instant.now(), ""), Instant.now().plusSeconds(60), false); PersistenceMultiDcSettings settings = PersistenceMultiDcSettings.create(system); PersistenceMultiDcSettings otherSettings = PersistenceMultiDcSettings.create(otherSystem); nodeA = system.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupA, settings), "auction-A-" + testName); nodeB = otherSystem.actorOf(auctionProps("bikeAuction-" + testName, auctionSetupB, otherSettings), "auction-B-" + testName); } Bid expectHighestBid(ActorRef node) { node.tell(GetHighestBid.INSTANCE, getTestActor()); return expectMsgClass(Bid.class); } void expectHighestBid(ActorRef node, String bidder, int expected) { Bid bid = expectHighestBid(node); assertEquals(expected, bid.offer); assertEquals(bidder, bid.bidder); } void expectClosed(ActorRef node) { node.tell(IsClosed.INSTANCE, getTestActor()); expectMsg(true); } } private static Config clusterConfig = ConfigFactory.parseString(String.join("\n", "akka.actor.provider = \"cluster\"", "akka.remote.netty.tcp.port = 0", "akka.remote.classic.netty.tcp.port = 0", "akka.remote.artery.canonical.port = 0", "akka.remote.artery.canonical.hostname = 127.0.0.1", "akka.cluster.jmx.multi-mbeans-in-same-jvm = on", // avoid eager init in tests "akka.persistence.multi-data-center.hot-standby.enabled = off", // FIXME when using Akka 2.6 we should use Jackson or JavaSerializable "akka.actor.allow-java-serialization = on", "akka.actor.warn-about-java-serializer-usage = off", // speed up joining and such "akka.cluster.gossip-interval = 500 ms")); private static ActorSystem createSystem(String dc) { return ActorSystem.create( "auctionTest", clusterConfig .withFallback(CassandraLifecycle.config()) //#disable-replication // Include persistenceMultiDcTestSettings in your ActorSystem configuration // to allow suspending replication through PersistenceMultiDcTestKit .withFallback(PersistenceMultiDcTestKit.persistenceMultiDcTestSettings("auctionTest", dc, JAPI.seq("DC-A", "DC-B")))); //#disable-replication } //#cassandra-hook private static ActorSystem system; private static ActorSystem otherSystem; @BeforeClass public static void setupSystems() { system = createSystem("DC-A"); CassandraLifecycle.startCassandra(system.name(), CassandraLauncher.DefaultTestConfigResource()); CassandraLifecycle.awaitPersistenceInit(system); otherSystem = createSystem("DC-B"); CassandraLifecycle.awaitPersistenceInit(otherSystem); } @AfterClass public static void tearDownSystems() { TestKit.shutdownActorSystem(system, true); TestKit.shutdownActorSystem(otherSystem, true); CassandraLifecycle.stopCassandra(); } //#cassandra-hook @Test public void propagateHighestBidToReplicatedActor() { new TestSetup("test-1") { { // simple bidding nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", minimumBid); // ebay style, still the minimum offer nodeA.tell(new OfferBid("Paul", 41), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 41); // ebay style, now updated to the highest counter offer awaitAssert( () -> { expectHighestBid(nodeB, "Mary", 41); return null; }); awaitAssert( () -> { // check that results have propagated to b expectHighestBid(nodeB, "Mary", 41); // ebay style, now updated to the highest counter offer return null; } ); // make sure that first bidder still keeps the crown nodeB.tell(new OfferBid("c", 42), ActorRef.noSender()); expectHighestBid(nodeB, "Mary", 42); } }; } @Test public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestButDifferentInEachDc() { new TestSetup("test-2") { { // highest bid comes first nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 41), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Mary", 41); expectHighestBid(nodeB, "Mary", 41); return null; }); // highest bid comes second nodeA.tell(new OfferBid("Paul", 50), ActorRef.noSender()); nodeB.tell(new OfferBid("Kat", 60), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Kat", 50); expectHighestBid(nodeB, "Kat", 50); return null; }); } }; } @Test public void eventuallyResolveConflictingBidsDuringAuctionIfBidsAreHighestAndEqualButDifferentTimeInEachDc() { new TestSetup("test3") { { nodeA.tell(new OfferBid("Mary", 15), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); nodeB.tell(new OfferBid("Paul", 15), ActorRef.noSender()); awaitAssert(() -> { expectHighestBid(nodeA, "Mary", 15); expectHighestBid(nodeB, "Mary", 15); return null; }); } }; } @Test public void eventuallyComeToAConsistentFinalResult() throws Exception { new TestSetup("test4") { { nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); Thread.sleep(3000); // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeA.tell(Finish.INSTANCE, ActorRef.noSender()); awaitAssert(() -> { expectClosed(nodeA); expectClosed(nodeB); return null; }); expectHighestBid(nodeA, "Paul", 43); expectHighestBid(nodeB, "Paul", 43); } }; } @Test public void eventuallyComeToAConsistentFinalResultWhenFinishingIsInitiatedOnNodeB() throws Exception { new TestSetup("test5") { { nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); Thread.sleep(3000); // Finish is scheduled by the AuctionEntity, but we can do it earlier from the test nodeB.tell(Finish.INSTANCE, ActorRef.noSender()); awaitAssert(() -> { expectClosed(nodeA); expectClosed(nodeB); return null; }); expectHighestBid(nodeA, "Paul", 43); expectHighestBid(nodeB, "Paul", 43); } }; } //#disable-replication @Test public void correctlyResolveConcurrentWritesWhenReplicationIsTemporarilySuspended() throws Exception { new TestSetup("test6") { { PersistenceMultiDcTestKit.disableReplication(otherSystem, system); nodeA.tell(new OfferBid("Mary", 42), ActorRef.noSender()); nodeB.tell(new OfferBid("Paul", 43), ActorRef.noSender()); expectHighestBid(nodeA, "Mary", 12); PersistenceMultiDcTestKit.enableReplication(otherSystem, system); awaitAssert(() -> { expectHighestBid(nodeA, "Paul", 42); return null; }); } }; } //#disable-replication //#cassandra-hook } //#cassandra-hook //#full-example