Shopping cart example

The provided CRDT data structures can be used as the root state of a replicated EventSourcedBehavior but they can also be nested inside another data structure. This requires a bit more careful thinking about the eventual consistency.

In this sample we model a shopping cart as a map of product ids and the number of that product added or removed in the shopping cart. By using the Counter CRDT and persisting its Update in our events we can be sure that an add or remove of items in any data center will eventually lead to all data centers ending up with the same number of each product.

Scala
sourceobject ShoppingCart {

  type ProductId = String

  sealed trait Command extends CborSerializable
  final case class AddItem(id: ProductId, count: Int) extends Command
  final case class RemoveItem(id: ProductId, count: Int) extends Command
  final case class GetCartItems(replyTo: ActorRef[CartItems]) extends Command
  final case class CartItems(items: Map[ProductId, Int]) extends CborSerializable

  sealed trait Event extends CborSerializable
  final case class ItemUpdated(id: ProductId, update: Counter.Updated) extends Event

  final case class State(items: Map[ProductId, Counter])

  def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
    ReplicatedEventSourcing.commonJournalConfig(
      ReplicationId("cart", entityId, replicaId),
      allReplicaIds,
      PersistenceTestKitReadJournal.Identifier) { replicationContext =>
      EventSourcedBehavior[Command, Event, State](
        replicationContext.persistenceId,
        State(Map.empty),
        (state, cmd) => commandHandler(state, cmd),
        (state, event) => eventHandler(state, event))
    }
  }

  private def commandHandler(state: State, cmd: Command): Effect[Event, State] = {
    cmd match {
      case AddItem(productId, count) =>
        Effect.persist(ItemUpdated(productId, Counter.Updated(count)))
      case RemoveItem(productId, count) =>
        Effect.persist(ItemUpdated(productId, Counter.Updated(-count)))
      case GetCartItems(replyTo) =>
        val items = state.items.collect {
          case (id, counter) if counter.value > 0 => id -> counter.value.toInt
        }
        replyTo ! CartItems(items)
        Effect.none
    }
  }

  private def eventHandler(state: State, event: Event): State = {
    event match {
      case ItemUpdated(id, update) =>
        val newItems = state.items.get(id) match {
          case Some(counter) => state.items + (id -> counter.applyOperation(update))
          case None          => state.items + (id -> Counter.empty.applyOperation(update))
        }
        State(newItems)
    }
  }
}
Java
sourcepublic final class ShoppingCart
    extends ReplicatedEventSourcedBehavior<
        ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {

  public interface Event {}

  public static final class ItemUpdated implements Event {
    public final String productId;
    public final Counter.Updated update;

    public ItemUpdated(String productId, Counter.Updated update) {
      this.productId = productId;
      this.update = update;
    }
  }

  public interface Command {}

  public static final class AddItem implements Command {
    public final String productId;
    public final int count;

    public AddItem(String productId, int count) {
      this.productId = productId;
      this.count = count;
    }
  }

  public static final class RemoveItem implements Command {
    public final String productId;
    public final int count;

    public RemoveItem(String productId, int count) {
      this.productId = productId;
      this.count = count;
    }
  }

  public static class GetCartItems implements Command {
    public final ActorRef<CartItems> replyTo;

    public GetCartItems(ActorRef<CartItems> replyTo) {
      this.replyTo = replyTo;
    }
  }

  public static final class CartItems {
    public final Map<String, Integer> items;

    public CartItems(Map<String, Integer> items) {
      this.items = items;
    }
  }

  public static final class State {
    public final Map<String, Counter> items = new HashMap<>();
  }

  public static Behavior<Command> create(
      String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
    return ReplicatedEventSourcing.commonJournalConfig(
        new ReplicationId("blog", entityId, replicaId),
        allReplicas,
        PersistenceTestKitReadJournal.Identifier(),
        ShoppingCart::new);
  }

  private ShoppingCart(ReplicationContext replicationContext) {
    super(replicationContext);
  }

  @Override
  public State emptyState() {
    return new State();
  }

  @Override
  public CommandHandler<Command, Event, State> commandHandler() {
    return newCommandHandlerBuilder()
        .forAnyState()
        .onCommand(AddItem.class, this::onAddItem)
        .onCommand(RemoveItem.class, this::onRemoveItem)
        .onCommand(GetCartItems.class, this::onGetCartItems)
        .build();
  }

  private Effect<Event, State> onAddItem(State state, AddItem command) {
    return Effect()
        .persist(new ItemUpdated(command.productId, new Counter.Updated(command.count)));
  }

  private Effect<Event, State> onRemoveItem(State state, RemoveItem command) {
    return Effect()
        .persist(new ItemUpdated(command.productId, new Counter.Updated(-command.count)));
  }

  private Effect<Event, State> onGetCartItems(State state, GetCartItems command) {
    command.replyTo.tell(new CartItems(filterEmptyAndNegative(state.items)));
    return Effect().none();
  }

  private Map<String, Integer> filterEmptyAndNegative(Map<String, Counter> cart) {
    Map<String, Integer> result = new HashMap<>();
    for (Map.Entry<String, Counter> entry : cart.entrySet()) {
      int count = entry.getValue().value().intValue();
      if (count > 0) result.put(entry.getKey(), count);
    }
    return Collections.unmodifiableMap(result);
  }

  @Override
  public EventHandler<State, Event> eventHandler() {
    return newEventHandlerBuilder()
        .forAnyState()
        .onEvent(ItemUpdated.class, this::onItemUpdated)
        .build();
  }

  private State onItemUpdated(State state, ItemUpdated event) {
    final Counter counterForProduct;
    if (state.items.containsKey(event.productId)) {
      counterForProduct = state.items.get(event.productId);
    } else {
      counterForProduct = Counter.empty();
    }
    state.items.put(event.productId, counterForProduct.applyOperation(event.update));
    return state;
  }
}

With this model we cannot have a ClearCart command as that could give different states in different data centers. It is quite easy to imagine such a scenario: commands arriving in the order ClearCart, AddItem('a', 5) in one data center and the order AddItem('a', 5), ClearCart in another.

To clear a cart a client would instead have to remove as many items of each product as it sees in the cart at the time of removal.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.