Section 6: Projection for queries
Next, we will create an Akka Projection from the events emitted by the
ShoppingCart
entity. The Projection will update counts in the database to track item popularity. Then, we can query the database to find how popular an item is. Since ShoppingCart
entities can only be addressed by individual cart identifiers, we can find a particular cart, but we can’t find all carts that contain a particular item.
This piece of the full example focuses on the ItemPopularityProjection
and a query representation in the database. On this page you will learn how to:
-
implement a Projection
-
distribute the Projection instances over the nodes in the Akka Cluster
-
work with the Projection R2DBC API
The CQRS section explains why it is a good practice to build a Projection from entity events that can be queried. The Introduction to Akka Projections video is also a good starting point for learning about Akka Projections.
This example is using PostgreSQL for storing the Projection result, and the Projection offset. An alternative is described in Use Cassandra instead of PostgreSQL.
1. Process events in a Projection
To process events in a projection, we will:
-
encapsulate database access with
ItemPopularityRepository
, which can have a stubbed implementation for tests -
add Repository implementation for R2DBC
-
implement the event processing of the Projection in a
Handler
Follow these steps to process events in a Projection:
-
Add the
ItemPopularityRepository
:- Java
-
src/main/java/shopping/cart/repository/ItemPopularityRepository.java:
package shopping.cart.repository; import akka.projection.r2dbc.javadsl.R2dbcSession; (1) import java.util.Optional; import java.util.concurrent.CompletionStage; import shopping.cart.ItemPopularity; public interface ItemPopularityRepository { CompletionStage<Long> saveOrUpdate(R2dbcSession session, ItemPopularity itemPopularity); CompletionStage<Optional<ItemPopularity>> findById(R2dbcSession session, String id); CompletionStage<Optional<Long>> getCount(R2dbcSession session, String id); }
java1 Make sure to import the right projection R2DBC session: `akka.projection.r2dbc.javadsl.R2dbcSession`". - Scala
-
Add the
ItemPopularity
:src/main/java/shopping/cart/ItemPopularity.java:package shopping.cart; public record ItemPopularity(String itemId, long count) { public ItemPopularity() { // null version means the entity is not on the DB this("", 0); } public ItemPopularity changeCount(long delta) { return new ItemPopularity(itemId, count + delta); } }
java -
Add the implementation for PostgreSQL by using the
R2dbcSession
:- Java
-
src/main/java/shopping/cart/repository/ItemPopularityRepositoryImpl.java:
package shopping.cart.repository; import akka.projection.r2dbc.javadsl.R2dbcSession; import java.util.Optional; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.ItemPopularity; public class ItemPopularityRepositoryImpl implements ItemPopularityRepository { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Long> saveOrUpdate(R2dbcSession session, ItemPopularity itemPopularity) { return session.updateOne( session .createStatement( "INSERT INTO item_popularity (itemid, count) VALUES ($1, $2) " + "ON CONFLICT (itemid) DO UPDATE SET count = item_popularity.count + $3") .bind(0, itemPopularity.itemId()) .bind(1, itemPopularity.count()) .bind(2, itemPopularity.count())); } @Override public CompletionStage<Optional<ItemPopularity>> findById(R2dbcSession session, String id) { return session.selectOne( session .createStatement("SELECT itemid, count FROM item_popularity WHERE itemid = $1") .bind(0, id), row -> new ItemPopularity(row.get("itemid", String.class), row.get("count", Long.class))); } @Override public CompletionStage<Optional<Long>> getCount(R2dbcSession session, String id) { return session.selectOne( session.createStatement("SELECT count FROM item_popularity WHERE itemid = $1").bind(0, id), row -> row.get("count", Long.class)); } }
java - Scala
-
Add a class
ItemPopularityProjectionHandler
:- Java
-
src/main/java/shopping/cart/ItemPopularityProjectionHandler.java:
package shopping.cart; import akka.Done; import akka.persistence.query.typed.EventEnvelope; import akka.projection.r2dbc.javadsl.R2dbcHandler; import akka.projection.r2dbc.javadsl.R2dbcSession; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.repository.ItemPopularityRepository; public final class ItemPopularityProjectionHandler extends R2dbcHandler<EventEnvelope<ShoppingCart.Event>> { (1) private final Logger logger = LoggerFactory.getLogger(getClass()); private final String slice; private final ItemPopularityRepository repo; public ItemPopularityProjectionHandler(String slice, ItemPopularityRepository repo) { this.slice = slice; this.repo = repo; } private CompletionStage<ItemPopularity> findOrNew(R2dbcSession session, String itemId) { return repo.findById(session, itemId) .thenApply( itemPopularity -> itemPopularity.orElseGet(() -> new ItemPopularity(itemId, 0L))); } @Override public CompletionStage<Done> process( R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) { (2) ShoppingCart.Event event = envelope.event(); switch (event) { case ShoppingCart.ItemAdded(var __, String itemId, int qtd) -> { (3) var itemPopularity = new ItemPopularity(itemId, qtd); var updated = repo.saveOrUpdate(session, itemPopularity); return updated.thenApply( rows -> { logCount(itemId, rows); return Done.getInstance(); }); } case ShoppingCart.CheckedOut ignored -> { return CompletableFuture.completedFuture(Done.getInstance()); } case null, default -> { throw new IllegalArgumentException("Unknown event type: " + event.getClass()); } } } private CompletionStage<Done> getCount(R2dbcSession session, String itemId) { return repo.getCount(session, itemId) .thenApply( optLong -> { optLong.ifPresent(aLong -> logCount(itemId, aLong)); return Done.getInstance(); }); } private void logCount(String itemId, Long count) { logger.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", this.slice, itemId, count); } }
java1 Extends akka.projection.r2dbc.javadsl.R2dbcHandler
.2 The process
method will be handed each event/the events for processing.3 Match events and increment or decrement the count via the ItemPopularityRepository
, which encapsulates the database access. - Scala
2. Create Projection
We want to connect the events from the ShoppingCart
with the Projection. Several instances of the Projection may run on different nodes of the Akka Cluster. A numeric slice between 0 and 1023 is calculated from the entity id and assigned to all events it emits. A projection instance will be working on a specific range of slices. For example, with the default 4 projection instances, the 4 instances will process slices 0-255
, 256-511
, 512-767
and 768-1023
. Events from a specific entity will always be processed by the same projection instance so that it can build a stateful model from the events if needed.
To create the Projection:
-
Place the initialization code of the Projection in an
ItemPopularityProjection
class:- Java
-
src/main/java/shopping/cart/ItemPopularityProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.ExactlyOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.R2dbcProjectionSettings; import akka.projection.r2dbc.javadsl.R2dbcProjection; import java.util.List; import java.util.Optional; import shopping.cart.repository.ItemPopularityRepository; public final class ItemPopularityProjection { private ItemPopularityProjection() {} public static void init(ActorSystem<?> system, ItemPopularityRepository repository) { ShardedDaemonProcess.get(system) .initWithContext( (1) ProjectionBehavior.Command.class, "ItemPopularityProjection", 4, daemonContext -> { List<Pair<Integer, Integer>> sliceRanges = (2) EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses()); Pair<Integer, Integer> sliceRange = sliceRanges.get(daemonContext.processNumber()); return ProjectionBehavior.create(createProjection(system, repository, sliceRange)); }, ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static ExactlyOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjection( ActorSystem<?> system, ItemPopularityRepository repository, Pair<Integer, Integer> sliceRange) { int minSlice = sliceRange.first(); int maxSlice = sliceRange.second(); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3) EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), (4) "ShoppingCart", minSlice, maxSlice); String slice = "carts-" + minSlice + "-" + maxSlice; Optional<R2dbcProjectionSettings> settings = Optional.empty(); return R2dbcProjection.exactlyOnce( (5) ProjectionId.of("ItemPopularityProjection", slice), settings, sourceProvider, () -> new ItemPopularityProjectionHandler(slice, repository), (6) system); } }
java1 ShardedDaemonProcess
manages the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The slice ranges are calculated based on the number active Projection instances, given by daemonContext.totalProcesses()
. Then, the specific slice range for this instance is identified by itsprocessNumber
.3 The source of the Projection is EventSourcedProvider.eventsBySlices
with the selected slice range, defined by aminSlice
and amaxSlice
.4 Using the R2DBC event journal. 5 Using R2DBC for offset storage of the Projection using exactly-once
strategy. Offset and projected model will be persisted in the same transaction.6 Defining a Projection Handler
factory for the handler we wrote in the beginning of this part. - Scala
-
Call the
ItemPopularityProjection.init
fromMain
:
3. Query
To expose the item popularity to the outside of the service we can add an operation in the gRPC ShoppingCartService
. Follow these steps:
-
Add a new
GetItemPopularity
operation to theshopping_cart_service.proto
:src/main/protobuf/shoppingcart/shopping_cart_service.protoservice ShoppingCartService { (1) rpc GetItemPopularity(GetItemPopularityRequest) returns (GetItemPopularityResponse) {} } message GetItemPopularityRequest { string item_id = 1; } message GetItemPopularityResponse { string item_id = 1; int64 popularity_count = 2; }
protobuf -
Generate code from the new Protobuf specification by compiling the project:
-
Add the
getItemPopularity
method to theShoppingCartServiceImpl
:For this you have to add the
ItemPopularityRepository
as a constructor parameter to theShoppingCartServiceImpl
. TheItemPopularityRepository
instance is created inMain.java
so pass that instance as parameter toShoppingCartServiceImpl
.
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private final ItemPopularityRepository repository; private final ActorSystem<?> sys; public ShoppingCartServiceImpl( ActorSystem<?> system, ItemPopularityRepository repository) { (1) this.sys = system; this.repository = repository; timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); sharding = ClusterSharding.get(system); } @Override public CompletionStage<GetItemPopularityResponse> getItemPopularity(GetItemPopularityRequest in) { CompletionStage<Optional<ItemPopularity>> itemPopularity = R2dbcSession.withSession( sys, (2) (session) -> repository.findById(session, in.getItemId()) (3) ); return itemPopularity.thenApply( popularity -> { long count = popularity.map(ItemPopularity::count).orElse(0L); return GetItemPopularityResponse.newBuilder().setPopularityCount(count).build(); }); }
java1 Add the ItemPopularityRepository
to the service implementation constructor.2 Provide the ActorSystem
to build aR2dbcSession
.3 Implement getItemPopularity
by calling the repository to find the projected model by id and use the result to build the response. - Scala
While using R2DBC, calls to the repository are asynchronous and, therefore, results are wrapped with a |
4. Run locally
Try your solution by running locally:
-
Start the docker containers, unless they are already running:
docker compose up -d
shell script -
Create the item popularity table by creating a
ddl-scripts/create_user_tables.sql
file and adding the SQL statement below.CREATE TABLE IF NOT EXISTS public.item_popularity ( itemid VARCHAR(255) NOT NULL, count BIGINT NOT NULL, PRIMARY KEY (itemid));
sql -
Load the file into Postgres:
docker exec -i postgres-db psql -U shopping-cart -t < ddl-scripts/create_user_tables.sql
shell script -
Run the service with:
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
shell script
4.1. Exercise the service
Use grpcurl
to exercise the service:
-
Add 5 hoodies to a cart:
grpcurl -d '{"cartId":"cart3", "itemId":"hoodie", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell script -
Check the popularity of the item:
grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
shell script -
Add 2 hoodies to another cart:
grpcurl -d '{"cartId":"cart5", "itemId":"hoodie", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
shell script -
Check that the popularity count increased to 7:
grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
shell script