Shopping Cart Example
The provided CRDT data structures can be used as the root state of a ReplicatedEntity
but they can also be nested inside another data structure. This requires a bit more careful thinking about the eventual consistency.
In this sample we model a shopping cart as a map of product ids and the number of that product added or removed in the shopping cart. By using the Counter
CRDT and persisting its Update
in our events we can be sure that an add or remove of items in any data center will eventually lead to all data centers ending up with the same number of each product.
With this model we can not have a ClearCart
command as that could give different states in different data centers. It is quite easy to imagine such a scenario: commands arriving in the order ClearCart
, AddItem('a', 5)
in one data center and the order AddItem('a', 5), ClearCart
in another.
To clear a cart a client would instead have to remove as many items of each product as it sees in the cart at the time of removal.
Required imports:
- Scala
-
import java.util.UUID import akka.persistence.multidc.crdt.Counter import akka.persistence.multidc.crdt.Counter.Updated import akka.persistence.multidc.{ BaseSpec, PersistenceMultiDcSettings }
- Java
-
import java.io.Serializable; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.japi.JAPI; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import akka.persistence.multidc.PersistenceMultiDcSettings; import akka.persistence.multidc.crdt.Counter; import akka.persistence.multidc.testkit.CassandraLifecycle;
The ReplicatedEntity
implementation:
- Scala
-
type ProductId = String sealed trait CartEvent final case class ItemUpdated(id: ProductId, update: Updated) extends CartEvent sealed trait CartCommand final case class AddItem(id: ProductId, count: Int) extends CartCommand final case class RemoveItem(id: ProductId, count: Int) extends CartCommand final case object GetCartItems extends CartCommand final case class CartItems(items: Map[ProductId, Int]) def props(cartId: String, settings: PersistenceMultiDcSettings) = ReplicatedEntity.props("cart", cartId, () => new ShoppingCart, settings) final class ShoppingCart extends ReplicatedEntity[CartCommand, CartEvent, Map[ProductId, Counter]] { override def initialState: Map[ProductId, Counter] = Map.empty override def commandHandler: CommandHandler = CommandHandler { case (_, _, AddItem(productId, count)) => Effect.persist(ItemUpdated(productId, Updated(count))) case (_, _, RemoveItem(productId, count)) => Effect.persist(ItemUpdated(productId, Updated(-count))) case (ctx, state, GetCartItems) => val items = state.collect { case (id, counter) if counter.value > 0 => id -> counter.value.toInt } ctx.sender() ! CartItems(items) Effect.none } override def eventHandler(state: Map[ProductId, Counter], event: CartEvent): Map[ProductId, Counter] = { event match { case ItemUpdated(id, update) => state.get(id) match { case Some(counter) => state + (id -> counter.applyOperation(update)) case None => state + (id -> Counter.empty.applyOperation(update)) } } } } - Java
-
public interface CartEvent {} public static final class ItemUpdated implements CartEvent, Serializable { public final String productId; public final Counter.Updated update; public ItemUpdated(String productId, Counter.Updated update) { this.productId = productId; this.update = update; } } public interface CartCommand {} public static final class AddItem implements CartCommand { public final String productId; public final int count; public AddItem(String productId, int count) { this.productId = productId; this.count = count; } } public static final class RemoveItem implements CartCommand { public final String productId; public final int count; public RemoveItem(String productId, int count) { this.productId = productId; this.count = count; } } public static final CartCommand GET_CART_ITEMS = new CartCommand() {}; public static final class CartItems { public final Map<String, Integer> items; public CartItems(Map<String, Integer> items) { this.items = items; } } public static Props props(String cartId, PersistenceMultiDcSettings settings) { return ReplicatedEntity.props(CartCommand.class, "cart", cartId, ShoppingCart::new, settings); } private static class ShoppingCart extends ReplicatedEntity<CartCommand, CartEvent, Map<String, Counter>> { @Override public Map<String, Counter> initialState() { // the state must be immutable, to avoid pulling in an additional dependency in this sample // we just wrap the map as unmodifiable to guarantee we get an error on accidental modification // however in an actual application a third party immutable collection would be preferable return Collections.unmodifiableMap(new HashMap<>()); } @Override public CommandHandler<CartCommand, CartEvent, Map<String, Counter>> commandHandler() { return commandHandlerBuilder(CartCommand.class) .matchCommand(AddItem.class, (ctx, state, cmd) -> Effect().persist(new ItemUpdated(cmd.productId, new Counter.Updated(cmd.count))) ).matchCommand(RemoveItem.class, (ctx, state, cmd) -> Effect().persist(new ItemUpdated(cmd.productId, new Counter.Updated(-cmd.count))) ).matchExactCommand(GET_CART_ITEMS, (ctx, state, cmd) -> { ctx.getSender().tell(new CartItems(filterEmptyAndNegative(state)), ctx.getSelf()); return Effect().none(); }).build(); } private Map<String, Integer> filterEmptyAndNegative(Map<String, Counter> cart) { Map<String, Integer> result = new HashMap<>(); for (Map.Entry<String, Counter> entry : cart.entrySet()) { int count = entry.getValue().value().intValue(); if (count > 0) result.put(entry.getKey(), count); } return Collections.unmodifiableMap(result); } @Override public EventHandler<CartEvent, Map<String, Counter>> eventHandler() { return eventHandlerBuilder(CartEvent.class) .matchEvent(ItemUpdated.class, (cart, event) -> { // defensive copy for thread safety, see initialState for explanation Map<String, Counter> updatedCart = new HashMap<>(cart); final Counter counterForProduct; if (cart.containsKey(event.productId)) { counterForProduct = cart.get(event.productId); } else { counterForProduct = Counter.empty(); } updatedCart.put(event.productId, counterForProduct.applyOperation(event.update)); return Collections.unmodifiableMap(updatedCart); }).build(); } }