Section 6: Projection for queries
Next, we will create an Akka Projection from the events emitted by the
ShoppingCart entity. The Projection will update counts in the database to track item popularity. Then, we can query the database to find how popular an item is. Since ShoppingCart entities can only be addressed by individual cart identifiers, we can find a particular cart, but we can’t find all carts that contain a particular item.
This piece of the full example focuses on the ItemPopularityProjection and a query representation in the database. On this page you will learn how to:
-
implement a Projection
-
distribute the Projection instances over the nodes in the Akka Cluster
-
work with the Projection R2DBC API
The CQRS section explains why it is a good practice to build a Projection from entity events that can be queried. The Introduction to Akka Projections video is also a good starting point for learning about Akka Projections.
This example is using PostgreSQL for storing the Projection result, and the Projection offset. An alternative is described in Use Cassandra instead of PostgreSQL.
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Process events in a Projection
To process events in a projection, we will:
-
encapsulate database access with
ItemPopularityRepository, which can have a stubbed implementation for tests -
add Repository implementation for R2DBC
-
implement the event processing of the Projection in a
Handler
Follow these steps to process events in a Projection:
-
Add the
ItemPopularityRepository:- Java
-
src/main/java/shopping/cart/repository/ItemPopularityRepository.java:
package shopping.cart.repository; import akka.projection.r2dbc.javadsl.R2dbcSession; (1) import java.util.Optional; import java.util.concurrent.CompletionStage; import shopping.cart.ItemPopularity; public interface ItemPopularityRepository { CompletionStage<Long> saveOrUpdate(R2dbcSession session, ItemPopularity itemPopularity); CompletionStage<Optional<ItemPopularity>> findById(R2dbcSession session, String id); CompletionStage<Optional<Long>> getCount(R2dbcSession session, String id); }1 Make sure to import the right projection R2DBC session: `akka.projection.r2dbc.javadsl.R2dbcSession`". - Scala
-
src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala:
// tag::trait[] package shopping.cart.repository import akka.projection.r2dbc.scaladsl.R2dbcSession (1) import scala.concurrent.Future trait ItemPopularityRepository { def update(session: R2dbcSession, itemId: String, delta: Int): Future[Long] def getItem(session: R2dbcSession, itemId: String): Future[Option[Long]] } // end::trait[] // tag::impl[] class ItemPopularityRepositoryImpl() extends ItemPopularityRepository { override def update( session: R2dbcSession, itemId: String, delta: Int): Future[Long] = { session.updateOne(session .createStatement( "INSERT INTO item_popularity (itemid, count) VALUES ($1, $2) " + "ON CONFLICT(itemid) DO UPDATE SET count = item_popularity.count + $3") .bind(0, itemId) .bind(1, delta) .bind(2, delta)) } override def getItem( session: R2dbcSession, itemId: String): Future[Option[Long]] = { session.selectOne( session .createStatement("SELECT count FROM item_popularity WHERE itemid = $1") .bind(0, itemId)) { row => row.get("count", classOf[java.lang.Long]) } } } // end::impl[]1 Make sure to import the right projection R2DBC session: `akka.projection.r2dbc.scaladsl.R2dbcSession`".
-
Add the
ItemPopularity: Note that we are returning the popularity value directly, but adding anItemPopularitycould be an option as well.src/main/java/shopping/cart/ItemPopularity.java:package shopping.cart; public record ItemPopularity(String itemId, long count) { public ItemPopularity() { // null version means the entity is not on the DB this("", 0); } public ItemPopularity changeCount(long delta) { return new ItemPopularity(itemId, count + delta); } } -
Add the implementation for PostgreSQL by using the
R2dbcSession:- Java
-
src/main/java/shopping/cart/repository/ItemPopularityRepositoryImpl.java:
package shopping.cart.repository; import akka.projection.r2dbc.javadsl.R2dbcSession; import java.util.Optional; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.ItemPopularity; public class ItemPopularityRepositoryImpl implements ItemPopularityRepository { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<Long> saveOrUpdate(R2dbcSession session, ItemPopularity itemPopularity) { return session.updateOne( session .createStatement( "INSERT INTO item_popularity (itemid, count) VALUES ($1, $2) " + "ON CONFLICT (itemid) DO UPDATE SET count = item_popularity.count + $3") .bind(0, itemPopularity.itemId()) .bind(1, itemPopularity.count()) .bind(2, itemPopularity.count())); } @Override public CompletionStage<Optional<ItemPopularity>> findById(R2dbcSession session, String id) { return session.selectOne( session .createStatement("SELECT itemid, count FROM item_popularity WHERE itemid = $1") .bind(0, id), row -> new ItemPopularity(row.get("itemid", String.class), row.get("count", Long.class))); } @Override public CompletionStage<Optional<Long>> getCount(R2dbcSession session, String id) { return session.selectOne( session.createStatement("SELECT count FROM item_popularity WHERE itemid = $1").bind(0, id), row -> row.get("count", Long.class)); } } - Scala
-
src/main/scala/shopping/cart/repository/ItemPopularityRepository.scala:
class ItemPopularityRepositoryImpl() extends ItemPopularityRepository { override def update( session: R2dbcSession, itemId: String, delta: Int): Future[Long] = { session.updateOne(session .createStatement( "INSERT INTO item_popularity (itemid, count) VALUES ($1, $2) " + "ON CONFLICT(itemid) DO UPDATE SET count = item_popularity.count + $3") .bind(0, itemId) .bind(1, delta) .bind(2, delta)) } override def getItem( session: R2dbcSession, itemId: String): Future[Option[Long]] = { session.selectOne( session .createStatement("SELECT count FROM item_popularity WHERE itemid = $1") .bind(0, itemId)) { row => row.get("count", classOf[java.lang.Long]) } } }
-
Add a class
ItemPopularityProjectionHandler:- Java
-
src/main/java/shopping/cart/ItemPopularityProjectionHandler.java:
package shopping.cart; import akka.Done; import akka.persistence.query.typed.EventEnvelope; import akka.projection.r2dbc.javadsl.R2dbcHandler; import akka.projection.r2dbc.javadsl.R2dbcSession; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.cart.repository.ItemPopularityRepository; public final class ItemPopularityProjectionHandler extends R2dbcHandler<EventEnvelope<ShoppingCart.Event>> { (1) private final Logger logger = LoggerFactory.getLogger(getClass()); private final String slice; private final ItemPopularityRepository repo; public ItemPopularityProjectionHandler(String slice, ItemPopularityRepository repo) { this.slice = slice; this.repo = repo; } private CompletionStage<ItemPopularity> findOrNew(R2dbcSession session, String itemId) { return repo.findById(session, itemId) .thenApply( itemPopularity -> itemPopularity.orElseGet(() -> new ItemPopularity(itemId, 0L))); } @Override public CompletionStage<Done> process( R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) { (2) ShoppingCart.Event event = envelope.event(); switch (event) { case ShoppingCart.ItemAdded(var __, String itemId, int qtd) -> { (3) var itemPopularity = new ItemPopularity(itemId, qtd); var updated = repo.saveOrUpdate(session, itemPopularity); return updated.thenApply( rows -> { logCount(itemId, rows); return Done.getInstance(); }); } case ShoppingCart.CheckedOut ignored -> { return CompletableFuture.completedFuture(Done.getInstance()); } case null, default -> { throw new IllegalArgumentException("Unknown event type: " + event.getClass()); } } } private CompletionStage<Done> getCount(R2dbcSession session, String itemId) { return repo.getCount(session, itemId) .thenApply( optLong -> { optLong.ifPresent(aLong -> logCount(itemId, aLong)); return Done.getInstance(); }); } private void logCount(String itemId, Long count) { logger.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", this.slice, itemId, count); } }1 Extends akka.projection.r2dbc.javadsl.R2dbcHandler.2 The processmethod will be handed each event/the events for processing.3 Match events and increment or decrement the count via the ItemPopularityRepository, which encapsulates the database access. - Scala
-
src/main/scala/shopping/cart/ItemPopularityProjectionHandler.scala:
package shopping.cart import akka.Done import akka.actor.typed.ActorSystem import akka.persistence.query.typed.EventEnvelope import akka.projection.r2dbc.scaladsl.{ R2dbcHandler, R2dbcSession } import org.slf4j.LoggerFactory import shopping.cart.repository.ItemPopularityRepository import scala.concurrent.Future class ItemPopularityProjectionHandler( slice: String, system: ActorSystem[_], repo: ItemPopularityRepository) extends R2dbcHandler[EventEnvelope[ShoppingCart.Event]]() { (1) private val logger = LoggerFactory.getLogger(getClass) import system.executionContext override def process( session: R2dbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { (2) envelope.event match { (3) case ShoppingCart.ItemAdded(_, itemId, quantity) => repo .update(session, itemId, quantity) .flatMap(_ => logItemCount(session, itemId)) case ShoppingCart.ItemQuantityAdjusted( _, itemId, newQuantity, oldQuantity) => repo .update(session, itemId, newQuantity - oldQuantity) .flatMap(_ => logItemCount(session, itemId)) case ShoppingCart.ItemRemoved(_, itemId, oldQuantity) => repo .update(session, itemId, 0 - oldQuantity) .flatMap(_ => logItemCount(session, itemId)) case _: ShoppingCart.CheckedOut => Future.successful(Done) } } private def logItemCount( session: R2dbcSession, itemId: String): Future[Done] = { def log(count: Long): Unit = logger.info( "ItemPopularityProjectionHandler({}) item popularity for '{}': [{}]", slice, itemId, count) repo.getItem(session, itemId).map { case Some(l) => log(l) Done case None => throw new Exception("Something wrong during querying") } } }1 Extends akka.projection.r2dbc.scaladsl.R2dbcHandler.2 The processmethod will be handed each event/the events for processing.3 Match events and increment or decrement the count via the ItemPopularityRepository, which encapsulates the database access.
2. Create Projection
We want to connect the events from the ShoppingCart with the Projection. Several instances of the Projection may run on different nodes of the Akka Cluster. A numeric slice between 0 and 1023 is calculated from the entity id and assigned to all events it emits. A projection instance will be working on a specific range of slices. For example, with the default 4 projection instances, the 4 instances will process slices 0-255, 256-511, 512-767 and 768-1023. Events from a specific entity will always be processed by the same projection instance so that it can build a stateful model from the events if needed.
To create the Projection:
-
Place the initialization code of the Projection in an
ItemPopularityProjectionobject class:- Java
-
src/main/java/shopping/cart/ItemPopularityProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.ExactlyOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.R2dbcProjectionSettings; import akka.projection.r2dbc.javadsl.R2dbcProjection; import java.util.List; import java.util.Optional; import shopping.cart.repository.ItemPopularityRepository; public final class ItemPopularityProjection { private ItemPopularityProjection() {} public static void init(ActorSystem<?> system, ItemPopularityRepository repository) { ShardedDaemonProcess.get(system) .initWithContext( (1) ProjectionBehavior.Command.class, "ItemPopularityProjection", 4, daemonContext -> { List<Pair<Integer, Integer>> sliceRanges = (2) EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses()); Pair<Integer, Integer> sliceRange = sliceRanges.get(daemonContext.processNumber()); return ProjectionBehavior.create(createProjection(system, repository, sliceRange)); }, ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static ExactlyOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjection( ActorSystem<?> system, ItemPopularityRepository repository, Pair<Integer, Integer> sliceRange) { int minSlice = sliceRange.first(); int maxSlice = sliceRange.second(); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = (3) EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), (4) "ShoppingCart", minSlice, maxSlice); String slice = "carts-" + minSlice + "-" + maxSlice; Optional<R2dbcProjectionSettings> settings = Optional.empty(); return R2dbcProjection.exactlyOnce( (5) ProjectionId.of("ItemPopularityProjection", slice), settings, sourceProvider, () -> new ItemPopularityProjectionHandler(slice, repository), (6) system); } }1 ShardedDaemonProcessmanages the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.2 The slice ranges are calculated based on the number active Projection instances, given by daemonContext.totalProcesses(). Then, the specific slice range for this instance is identified by itsprocessNumber.3 The source of the Projection is EventSourcedProvider.eventsBySliceswith the selected slice range, defined by aminSliceand amaxSlice.4 Using the R2DBC event journal. 5 Using R2DBC for offset storage of the Projection using exactly-oncestrategy. Offset and projected model will be persisted in the same transaction.6 Defining a Projection Handlerfactory for the handler we wrote in the beginning of this part. - Scala
-
src/main/scala/shopping/cart/ItemPopularityProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.SourceProvider import akka.projection.{ Projection, ProjectionBehavior, ProjectionId } import shopping.cart.repository.ItemPopularityRepository object ItemPopularityProjection { def init( system: ActorSystem[_], repository: ItemPopularityRepository): Unit = { def sourceProvider(sliceRange: Range) : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider (1) .eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, (2) "ShoppingCart", sliceRange.min, sliceRange.max) def projection( sliceRange: Range): Projection[EventEnvelope[ShoppingCart.Event]] = { val minSlice = sliceRange.min val maxSlice = sliceRange.max val projectionId = ProjectionId("ItemPopularityProjection", s"carts-$minSlice-$maxSlice") R2dbcProjection.exactlyOnce( (3) projectionId, settings = None, sourceProvider(sliceRange), handler = () => (4) new ItemPopularityProjectionHandler( s"carts-$minSlice-$maxSlice", system, repository))(system) } ShardedDaemonProcess(system).initWithContext( (5) name = "ItemPopularityProjection", initialNumberOfInstances = 4, behaviorFactory = { daemonContext => val sliceRanges = (6) EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier, daemonContext.totalProcesses) val sliceRange = sliceRanges(daemonContext.processNumber) ProjectionBehavior(projection(sliceRange)) }, ShardedDaemonProcessSettings(system), stopMessage = ProjectionBehavior.Stop) } }1 The source of the Projection is EventSourcedProvider.eventsBySliceswith the selected slice range, defined by aminSliceand amaxSlice.2 Using the R2DBC event journal. 3 Using R2DBC for offset storage of the Projection using exactly-oncestrategy. Offset and projected model will be persisted in the same transaction.4 Defining a Projection Handlerfactory for the handler we wrote in the beginning of this part.5 ShardedDaemonProcessmanages the Projection instances. It ensures the Projection instances are always running and distributes them over the nodes in the Akka Cluster.6 The slice ranges are calculated based on the number active Projection instances, given by daemonContext.totalProcesses(). Then, the specific slice range for this instance is identified by itsprocessNumber.
-
Call the
ItemPopularityProjection.initfromMain:- Java
-
src/main/java/shopping/cart/Main.java
ItemPopularityRepository itemPopularityRepository = new ItemPopularityRepositoryImpl(); ItemPopularityProjection.init(system, itemPopularityRepository); (1)1 Initialize the Projection passing an instance of the repository implementation. - Scala
-
src/main/scala/shopping/cart/Main.scala
val itemPopularityRepository = new ItemPopularityRepositoryImpl() ItemPopularityProjection.init(system, itemPopularityRepository) (1)1 Initialize the Projection passing an instance of the repository implementation.
3. Query
To expose the item popularity to the outside of the service we can add an operation in the gRPC ShoppingCartService. Follow these steps:
-
Add a new
GetItemPopularityoperation to theshopping_cart_service.proto:src/main/protobuf/shoppingcart/shopping_cart_service.protoservice ShoppingCartService { (1) rpc GetItemPopularity(GetItemPopularityRequest) returns (GetItemPopularityResponse) {} } message GetItemPopularityRequest { string item_id = 1; } message GetItemPopularityResponse { string item_id = 1; int64 popularity_count = 2; } -
Generate code from the new Protobuf specification by compiling the project:
- Java
-
mvn compile - Scala
-
sbt compile
-
Add the
getItemPopularitymethod to theShoppingCartServiceImpl:For this you have to add the
ItemPopularityRepositoryas a constructor parameter to theShoppingCartServiceImpl. TheItemPopularityRepositoryinstance is created inMain.scalaMain.javaso pass that instance as parameter toShoppingCartServiceImpl.
- Java
-
src/main/java/shopping/cart/ShoppingCartServiceImpl.java
private final ItemPopularityRepository repository; private final ActorSystem<?> sys; public ShoppingCartServiceImpl( ActorSystem<?> system, ItemPopularityRepository repository) { (1) this.sys = system; this.repository = repository; timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); sharding = ClusterSharding.get(system); } @Override public CompletionStage<GetItemPopularityResponse> getItemPopularity(GetItemPopularityRequest in) { CompletionStage<Optional<ItemPopularity>> itemPopularity = R2dbcSession.withSession( sys, (2) (session) -> repository.findById(session, in.getItemId()) (3) ); return itemPopularity.thenApply( popularity -> { long count = popularity.map(ItemPopularity::count).orElse(0L); return GetItemPopularityResponse.newBuilder().setPopularityCount(count).build(); }); }1 Add the ItemPopularityRepositoryto the service implementation constructor.2 Provide the ActorSystemto build aR2dbcSession.3 Implement getItemPopularityby calling the repository to find the projected model by id and use the result to build the response. - Scala
-
src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala
class ShoppingCartServiceImpl( system: ActorSystem[_], itemPopularityRepository: ItemPopularityRepository) (1) extends proto.ShoppingCartService { override def getItemPopularity(in: proto.GetItemPopularityRequest) : Future[proto.GetItemPopularityResponse] = { R2dbcSession .withSession(system) { session => (2) itemPopularityRepository.getItem(session, in.itemId) (3) } .map { case Some(count) => proto.GetItemPopularityResponse(in.itemId, count) case None => proto.GetItemPopularityResponse(in.itemId, 0L) } } }1 Add the ItemPopularityRepositoryto the service implementation constructor.2 Provide the ActorSystemto build aR2dbcSession.3 Implement getItemPopularityby calling the repository to find the projected model by id and use the result to build the response.
|
While using R2DBC, calls to the repository are asynchronous and, therefore, results are wrapped with a |
4. Run locally
Try your solution by running locally:
-
Start the docker containers, unless they are already running:
docker compose up -d -
Create the item popularity table by creating a
ddl-scripts/create_user_tables.sqlfile and adding the SQL statement below.CREATE TABLE IF NOT EXISTS public.item_popularity ( itemid VARCHAR(255) NOT NULL, count BIGINT NOT NULL, PRIMARY KEY (itemid)); -
Load the file into Postgres:
docker exec -i postgres-db psql -U shopping-cart -t < ddl-scripts/create_user_tables.sql -
Run the service with:
# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.confsbt -Dconfig.resource=local1.conf run
4.1. Exercise the service
Use grpcurl to exercise the service:
-
Add 5 hoodies to a cart:
grpcurl -d '{"cartId":"cart3", "itemId":"hoodie", "quantity":5}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem -
Check the popularity of the item:
grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity -
Add 2 hoodies to another cart:
grpcurl -d '{"cartId":"cart5", "itemId":"hoodie", "quantity":2}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem -
Check that the popularity count increased to 7:
grpcurl -d '{"itemId":"hoodie"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity