Section 4: Create the Event Sourced Cart entity
Next, we will create the Cart entity that manages the state for each shopping cart. The architectural overview shows how the entity is related to the Cart service. The Cart entity will use Event Sourcing to persist events that capture changes to the Cart’s state. The entity writes events to the event journal, which we will use later to create projections:
For now, we’ll implement the command to add items to the Cart. In the next part of the tutorial, we will expand it to handle more commands and events. On this page you will learn how to:
-
implement an Event Sourced entity
-
unit test the entity
-
distribute the entities over the nodes in the Akka Cluster
-
send requests from the gRPC service implementation to the entities
If you are unfamiliar with Event Sourcing, refer to the Event Sourcing section for an explanation.
The Event Sourcing with Akka 2.6 video is also a good starting point for learning Event Sourcing.
This example is using PostgreSQL for storing the events. An alternative is described in Use Cassandra instead of PostgreSQL.
1. Add commands and events
Commands are the "external" API of an entity. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, but different events might be generated depending on the current state of the entity. A command can also be validated and be rejected if it has invalid input or can’t be handled by current state of the entity.
To add a command and an event, follow these steps:
-
Define a
ShoppingCart
class extendingEventSourcedBehaviorWithEnforcedReplies
and theAddItem
command:- Java
-
src/main/java/shopping/cart/ShoppingCart.java:
package shopping.cart; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.Behaviors; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.Entity; import akka.cluster.sharding.typed.javadsl.EntityTypeKey; import akka.pattern.StatusReply; import akka.persistence.typed.PersistenceId; import akka.persistence.typed.javadsl.*; import akka.serialization.jackson.CborSerializable; import java.time.Duration; import java.util.HashMap; import java.util.Map; public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... public sealed interface Command extends CborSerializable {} /** * A command to add an item to the cart. * * <p>It replies with `StatusReply<Summary>`, which is sent back to the caller when all the * events emitted by this command are successfully persisted. */ record AddItem(String itemId, int quantity, ActorRef<StatusReply<Summary>> replyTo) implements Command {} /** Summary of the shopping cart state, used in reply messages. */ public record Summary(Map<String, Integer> items) implements CborSerializable {} }
java - Scala
-
Add a corresponding
ItemAdded
event:- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... public sealed interface Event extends CborSerializable { String cartId(); } record ItemAdded(String cartId, String itemId, int quantity) implements Event {} }
java - Scala
2. Add state map
Items added to the Cart are added to a Map
. The contents of the Map
comprise the Cart’s state. Add the Map
to the ShoppingCart
class as shown:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... public record State(Map<String, Integer> items) implements CborSerializable { public State() { this(new HashMap<>()); } public boolean hasItem(String itemId) { return items.containsKey(itemId); } public State updateItem(String itemId, int quantity) { Map<String, Integer> updatedItems = new HashMap<>(this.items); if (quantity == 0) { updatedItems.remove(itemId); } else { updatedItems.put(itemId, quantity); } return new State(updatedItems); } public Summary toSummary() { return new Summary(items); } public int itemCount(String itemId) { return items.get(itemId); } public boolean isEmpty() { return items.isEmpty(); } } }
java - Scala
3. Implement a command handler
The Cart entity will receive commands that request changes to Cart state. We will implement a command handler to process these commands and emit a reply. Our business logic allows only items to be added which are not in the cart yet and require a positive quantity.
Implement the commandHandler
as required by EventSourcedBehaviorWithEnforcedReplies
:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... @Override public CommandHandlerWithReply<Command, Event, State> commandHandler() { CommandHandlerWithReplyBuilder<Command, Event, State> builder = newCommandHandlerWithReplyBuilder(); builder.forAnyState().onCommand(AddItem.class, this::onAddItem); (1) return builder.build(); } private ReplyEffect<Event, State> onAddItem(State state, AddItem cmd) { if (state.hasItem(cmd.itemId)) { return Effect() .reply( cmd.replyTo, StatusReply.error( "Item '" + cmd.itemId + "' was already added to this shopping cart")); } else if (cmd.quantity <= 0) { return Effect().reply(cmd.replyTo, StatusReply.error("Quantity must be greater than zero")); } else { return Effect() (2) .persist(new ItemAdded(cartId, cmd.itemId, cmd.quantity)) .thenReply(cmd.replyTo, updatedCart -> StatusReply.success(updatedCart.toSummary())); } } }
java1 Matching the AddItem
command.2 Persisting the ItemAdded
event and replying to the sender. - Scala
If an AddItem
command is accepted, the Effect.persist
applies an event to the cart’s state and makes sure this event is stored before replying to the command. The returned ReplyEffect
reacts on the commands by deciding which effect they should have on the entity. If the validation fails we want to send back an error message. The reply can be a success or an error and that is the reason for using the StatusReply
.
See all available effects in the Akka reference documentation .
4. Add the event handler
From commands, the entity creates events that represent state changes. Aligning with the command handler above, the entity’s event handler reacts to events and updates the state. The events are continuously persisted to the Event Journal datastore, while the entity state is kept in memory. Other parts of the application may listen to the events. In case of a restart, the entity recovers its latest state by replaying the events from the Event Journal.
Notice that there are no decisions on events, they are applied without any checking.
Add the event handler as follows:
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { // ... @Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent(ItemAdded.class, (state, evt) -> state.updateItem(evt.itemId, evt.quantity)) .build(); } }
java - Scala
5. Add initialization
To glue the command handler, event handler, and state together, we need some initialization code. Our code will distribute the Cart entities over nodes in the Akka Cluster with Cluster Sharding , enable snapshots to reduce recovery time when the entity is started, and restart with backoff in the case of failure.
- Java
-
src/main/java/shopping/cart/ShoppingCart.java
public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { static final EntityTypeKey<Command> ENTITY_KEY = EntityTypeKey.create(Command.class, "ShoppingCart"); private final String cartId; public static void init(ActorSystem<?> system) { ClusterSharding.get(system) .init( Entity.of( ENTITY_KEY, entityContext -> { (1) return ShoppingCart.create(entityContext.getEntityId()); })); } public static Behavior<Command> create(String cartId) { return Behaviors.setup( ctx -> EventSourcedBehavior (2) .start(new ShoppingCart(cartId), ctx)); } @Override public RetentionCriteria retentionCriteria() { (3) return RetentionCriteria.snapshotEvery(100); } private ShoppingCart(String cartId) { super( PersistenceId.of(ENTITY_KEY.name(), cartId), SupervisorStrategy (4) .restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)); this.cartId = cartId; } @Override public State emptyState() { return new State(); } }
java1 The entities are distributed over the nodes in the Akka Cluster with Cluster Sharding. 2 An EventSourcedBehavior
is created for theShoppingCart
.3 Snapshotting is an optimization to reduce recovery when the entity is started. 4 Restarting with backoff in case of failures. - Scala
Then we need to call ShoppingCart.init
from Main
. Add the following before the gRPC ShoppingCartServer
initialization:
Verify that everything compiles with:
mvn compile
6. Unit testing
To test the ShoppingCart
entity we can write a unit test using the TestKitJunitResource
.
A test for the AddItem
command looks like this in src/test/java/shopping/cart/ShoppingCartTest.java
:
- Java
-
src/test/java/shopping/cart/ShoppingCartTest.java:
package shopping.cart; import static akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit.CommandResultWithReply; import static org.junit.Assert.*; import akka.actor.testkit.typed.javadsl.TestKitJunitResource; import akka.pattern.StatusReply; import akka.persistence.testkit.javadsl.EventSourcedBehaviorTestKit; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; public class ShoppingCartTest { private static final String CART_ID = "testCart"; @ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(EventSourcedBehaviorTestKit.config()); private EventSourcedBehaviorTestKit<ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> eventSourcedTestKit = EventSourcedBehaviorTestKit.create(testKit.system(), ShoppingCart.create(CART_ID)); @Before public void beforeEach() { eventSourcedTestKit.clear(); } @Test public void addAnItemToCart() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result.reply().isSuccess()); ShoppingCart.Summary summary = result.reply().getValue(); assertEquals(1, summary.items().size()); assertEquals(42, summary.items().get("foo").intValue()); assertEquals(new ShoppingCart.ItemAdded(CART_ID, "foo", 42), result.event()); } @Test public void rejectAlreadyAddedItem() { CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result1 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result1.reply().isSuccess()); CommandResultWithReply< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State, StatusReply<ShoppingCart.Summary>> result2 = eventSourcedTestKit.runCommand(replyTo -> new ShoppingCart.AddItem("foo", 42, replyTo)); assertTrue(result2.reply().isError()); assertTrue(result2.hasNoEvents()); } }
java - Scala
Run the test with:
mvn test
You can learn more about the TestKitJunitResource
in the Akka reference documentation
7. Send commands to the entities
We want to send commands to the entities from the gRPC service implementation. In the previous step, we wrote a dummy implementation of addItem
in the ShoppingCartServiceImpl
. We can now replace that and send ShoppingCart.AddItem
commands from ShoppingCartServiceImpl
:
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.grpc.GrpcServiceException; import io.grpc.Status; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.proto.*; public final class ShoppingCartServiceImpl implements ShoppingCartService { private final Logger logger = LoggerFactory.getLogger(getClass()); private final Duration timeout; private final ClusterSharding sharding; public ShoppingCartServiceImpl(ActorSystem<?> system) { timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); sharding = ClusterSharding.get(system); } @Override public CompletionStage<Cart> addItem(AddItemRequest in) { logger.info("addItem {} to cart {}", in.getItemId(), in.getCartId()); var entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, in.getCartId()); CompletionStage<ShoppingCart.Summary> reply = entityRef.askWithStatus( replyTo -> new ShoppingCart.AddItem(in.getItemId(), in.getQuantity(), replyTo), timeout); var cart = reply.thenApply(ShoppingCartServiceImpl::toProtoCart); return convertError(cart); } private static Cart toProtoCart(ShoppingCart.Summary cart) { List<Item> protoItems = cart.items().entrySet().stream() .map( entry -> Item.newBuilder() .setItemId(entry.getKey()) .setQuantity(entry.getValue()) .build()) .collect(Collectors.toList()); return Cart.newBuilder().addAllItems(protoItems).build(); } private static <T> CompletionStage<T> convertError(CompletionStage<T> response) { return response.exceptionally( ex -> { if (ex instanceof TimeoutException) { throw new GrpcServiceException( Status.UNAVAILABLE.withDescription("Operation timed out")); } else { throw new GrpcServiceException( Status.INVALID_ARGUMENT.withDescription(ex.getMessage())); } }); } }
java - Scala
If the command is successful, the entity will reply with StatusReply.Success
with the updated ShoppingCart.Summary
. If the validation in the entity fails it will reply with StatusReply.Error
, which will fail the CompletionStage
that is returned from askWithStatus
.
Also, we added an ActorSystem
parameter to the constructor of ShoppingCartServiceImpl
. Edit Main
to include the system
as the parameter when creating a new instance of the ShoppingCartServiceImpl
.
8. Configure Postgres
The events are stored in a PostgreSQL database and the template project includes configuration for that in the src/main/resources/persistence.conf
file. We have to enable this configuration by including persistence.conf
in application.conf
:
include "persistence"
9. Run locally
To run the service, we first need to start the PostgreSQL to persist the events. Then we can run the service:
-
From the root project directory, run the following command:
docker compose up -d
shell scriptThis command will build and run the containers. The
-d
flag starts the containers in detached mode. Containers started in detached mode exit when the root process used to run the container exits -
Create the PostgreSQL tables from the SQL script located inside the
ddl-scripts
at the root of the project:docker exec -i postgres-db psql -U shopping-cart -t < ddl-scripts/create_tables.sql
shell scriptIt creates all tables needed for Akka Persistence as well as the offset store table for Akka Projection.
-
Run the service with:
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
shell script
9.1. Exercise the service
Use grpcurl
to exercise the service:
-
Use
grpcurl
to add 3 socks to a cart:grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":3}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell script -
Test the validation logic by trying to add the same item again, which should result in an error:
grpcurl -d '{"cartId":"cart1", "itemId":"socks", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell script -
To verify that the events are actually saved, and the state can be recovered from the events you can stop the service with
ctrl-c
and then start it again. -
Add 2 t-shirts to the same cart:
grpcurl -d '{"cartId":"cart1", "itemId":"t-shirt", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell scriptThe returned updated cart should still contain the 3 socks.
9.2. Exercise with multiple service instances
Another fun experiment is to start several instances of the service on different ports (2552, 2553) and then interact with different carts via the different gRPC servers (gRPC ports 8101, 8102, 8103). To do this, you can use the other provided configuration files:
-
In a new terminal, start a second instance with local configuration #2:
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local2.conf
shell script -
Within another terminal, start a third instance with local configuration #3:
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local3.conf
shell script