Section 5: Complete Event Sourced entity
On this page, we will complete the cart entity with more commands and events. This ShoppingCart
entity will use Event Sourcing to persist events that represent changes to the state of the cart.
On this page you will learn how to:
-
implement an Event Sourced entity by expanding work from the previous steps and adding to the
ShoppingCart
:-
Checkout
- a command to checkout the shopping cart -
CheckedOut
- an event to capture checkouts -
Get
- a way get the current state of the shopping cart
-
At the end, we also provide a list of Optional commands and events that you can add on your own to test your knowledge.
1. Add the command and event for checkout
When the cart has been checked out it should not accept any commands that change its state. However, it should still be possible to Get
the current state of a checked out cart. We suggest you try implementing Checkout
on your own and then compare it with the solution shown below. Add the Checkout
command alongside the existing AddItem
command.
The following sections show our solution.
1.3. State changes
The state should include a value for if and when the cart was checked out:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public record State(Map<String, Integer> items, Optional<Instant> checkoutDate) implements CborSerializable { public State() { this(new HashMap<>(), Optional.empty()); } public boolean isCheckedOut() { return checkoutDate.isPresent(); } public State checkout(Instant now) { return new State(items, Optional.of(now)); } public Summary toSummary() { return new Summary(items, isCheckedOut()); } public boolean hasItem(String itemId) { return items.containsKey(itemId); } public State updateItem(String itemId, int quantity) { Map<String, Integer> updatedItems = new HashMap<>(this.items); if (quantity == 0) { updatedItems.remove(itemId); } else { updatedItems.put(itemId, quantity); } return new State(updatedItems, this.checkoutDate); } public boolean isEmpty() { return items.isEmpty(); } } public record Summary(Map<String, Integer> items, boolean checkedOut) implements CborSerializable {}
java - Scala
1.4. Unit test
Add a unit test for the new Checkout
command in ShoppingCartTest
:
- Java
-
src/test/java/shopping/cart/ShoppingCartTest.java
@Test public void checkout() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result1 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result1.reply().isSuccess()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result2 = eventSourcedTestKit.runCommand(ShoppingCart.Checkout::new); assertTrue(result2.reply().isSuccess()); assertTrue(result2.event() instanceof ShoppingCart.CheckedOut); assertEquals(CART_ID, result2.event().cartId()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result3 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result3.reply().isError()); }
java - Scala
Commands should be handled differently when the cart has been checked out. AddItem
is no longer allowed after checkout. Therefore, we refactor the commandHandler
method into two separate methods openShoppingCart
and checkedOutShoppingCart
that are used depending on the checkedOut
state. The previous code for AddItem
goes into the openShoppingCart
method as well as the new Checkout
command.
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
@Override public CommandHandlerWithReply<Command, Event, State> commandHandler() { return openShoppingCart() .orElse(checkedOutShoppingCart()) .build(); } private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> openShoppingCart() { return newCommandHandlerWithReplyBuilder() .forState(state -> !state.isCheckedOut()) .onCommand(AddItem.class, this::onAddItem) .onCommand(Checkout.class, this::onCheckout); } private ReplyEffect<Event, State> onAddItem(State state, AddItem cmd) { if (state.hasItem(cmd.itemId)) { return Effect() .reply( cmd.replyTo, StatusReply.error( "Item '" + cmd.itemId + "' was already added to this shopping cart")); } else if (cmd.quantity <= 0) { return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero")); } else { return Effect() .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity)) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } } private ReplyEffect<Event, State> onCheckout(State state, Checkout cmd) { if (state.isEmpty()) { return Effect() .reply(cmd.replyTo, StatusReply.error("Cannot checkout an empty shopping cart")); } else { return Effect() .persist(new CheckedOut(cartId, Instant.now())) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } }
java - Scala
In checkedOutShoppingCart
the AddItem
and Checkout
commands should be rejected:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> checkedOutShoppingCart() { return newCommandHandlerWithReplyBuilder() .forState(State::isCheckedOut) .onCommand( AddItem.class, cmd -> Effect() .reply( cmd.replyTo, StatusReply.error( "Can't add an item to an already checked out shopping cart"))) .onCommand( Checkout.class, cmd -> Effect() .reply( cmd.replyTo, StatusReply.error("Can't checkout already checked out shopping cart"))); }
java - Scala
1.5. Event handler
We still need to add the event handler for the CheckedOut
event in the handleEvent
method:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
@Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity)) .onEvent(CheckedOut.class, (state, evt) -> state.checkout(evt.eventTime)) .build(); }
java - Scala
2. Add Get command
Add the Get
command alongside the existing AddItem
and Checkout
commands:
Add a unit test for the new Get
command in ShoppingCartSpec
:
- Java
-
src/test/java/shopping/cart/ShoppingCartTest.java
@Test public void get() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result1 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result1.reply().isSuccess()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, ShoppingCart.Summary> result2 = eventSourcedTestKit.runCommand(ShoppingCart.Get::new); assertFalse(result2.reply().checkedOut()); assertEquals(1, result2.reply().items().size()); assertEquals(42, result2.reply().items().get("foo").intValue()); }
java - Scala
The command handler for Get
is independent of the checkedOut
state, so it can be added to the command handler builder using forAnyState()
:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
@Override public CommandHandlerWithReply<Command, Event, State> commandHandler() { return openShoppingCart() .orElse(checkedOutShoppingCart()) .orElse(getCommandHandler()) .build(); } private CommandHandlerWithReplyBuilderByState<Command, Event, State, State> getCommandHandler() { return newCommandHandlerWithReplyBuilder() .forAnyState() .onCommand(Get.class, (state, cmd) -> Effect().reply(cmd.replyTo, state.toSummary())); }
java - Scala
Try the new Get
command by running the unit tests with:
mvn test
3. Add new operations to the service descriptor
In the existing shopping_cart_service.proto
add corresponding operation definitions in the form of the two rpc calls listed below:
syntax = "proto3";
package shoppingcart;
option java_multiple_files = true;
option java_package = "shopping.cart.proto";
// gRPC definition for ShoppingCartService
service ShoppingCartService {
rpc AddItem(AddItemRequest) returns (Cart) {}
rpc Checkout(CheckoutRequest) returns (Cart) {} (1)
rpc GetCart(GetCartRequest) returns (Cart) {} (2)
}
message AddItemRequest {
string cart_id = 1;
string item_id = 2;
int32 quantity = 3;
}
message CheckoutRequest {
string cart_id = 1;
}
message GetCartRequest {
string cart_id = 1;
}
message Cart { (3)
repeated Item items = 1;
bool checked_out = 2; (4)
}
message Item {
string item_id = 1;
int32 quantity = 2;
}
1 | Defines the Checkout operation. |
2 | Defines the GetCart operation. |
3 | For simplicity, most requests share a common response. However, for easier evolution of an interface, separate responses are often a better choice. |
4 | Note the new checked_out flag. |
Generate code by compiling the project:
mvn compile
You will see a compilation error in ShoppingCartServiceImpl.java
, but that is expected with the changed Protobuf definition. We will fix that now.
Add implementations of the new operations in ShoppingCartServiceImpl
in the same way as addItem
:
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
@Override public CompletionStage<Cart> checkout(CheckoutRequest in) { logger.info("checkout {}", in.getCartId()); var entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); var reply = entityRef .askWithStatus(ShoppingCart.Checkout::new, timeout) .thenApply(ShoppingCartServiceImpl::toProtoCart); return convertError(reply); } @Override public CompletionStage<Cart> getCart(GetCartRequest in) { logger.info("getCart {}", in.getCartId()); var entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); var reply = entityRef.ask(ShoppingCart.Get::new, timeout); var protoCart = reply.thenApply( cart -> { if (cart.items().isEmpty()) throw new GrpcServiceException( Status.NOT_FOUND.withDescription("Cart " + in.getCartId() + " not found")); else return toProtoCart(cart); }); return convertError(protoCart); }
java - Scala
We also have to include the new checkedOut
flag when converting from ShoppingCart.Summary
to proto.Cart
.
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private static Cart toProtoCart(ShoppingCart.Summary cart) { List<Item> protoItems = cart.items().entrySet().stream() .map( entry -> Item.newBuilder() .setItemId(entry.getKey()) .setQuantity(entry.getValue()) .build()) .collect(Collectors.toList()); return Cart.newBuilder().setCheckedOut(cart.checkedOut()).addAllItems(protoItems).build(); }
java - Scala
4. Run locally
Start the PostgreSQL database, unless it’s already running:
docker compose up -d
Run the service with:
# make sure to compile before running exec:exec
mvn compile exec:exec -DAPP_CONFIG=local1.conf
4.1. Exercise the service
Use grpcurl
to exercise the service:
-
Add an item to the cart:
grpcurl -d '{"cartId":"cart2", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell script -
Check the quantity of the cart:
grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart
shell script -
Check out cart:
grpcurl -d '{"cartId":"cart2"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
shell script
Optional commands and events
The commands and events listed in this section are not mandatory for subsequent steps of the tutorial and their details won’t be covered on this page. You can implement the commands, events, and State
management following the pattern we used for the AddItem
command and ItemAdded
event in the previous step. This is a good exercise to help solidify your knowledge of how to implement EventSourcedBehavior
. Optional commands and corresponding events that you can add on your own:
-
RemoveItem
- remove an item from the cart -
AdjustItemQuantity
- adjust the quantity of an item in the cart -
ItemRemoved
-
ItemQuantityAdjusted
After adding the optional commands, you can build and run again and try them out:
Update the quantity of an item:
grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":6}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.UpdateItem
Get the cart state again:
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetCart