Section 8: Projection calling gRPC service
To complete the example, we need a way to handle orders. We will add another Projection from the events of the ShoppingCart
entity. We will also create a gRPC Order Service, ShoppingOrderService
. The Projection calls the Order Service when the shopping carts are checked out.
This part of the full example focuses on the gRPC client in the SendOrderProjection
. On this page you will learn how to:
-
call another service with Akka gRPC
-
implement another gRPC service by adding the
ShoppingOrderService
Source downloads
If you prefer to simply view and run the example, download a zip file containing the completed code:
- Java
- Scala
1. Add the Order gRPC service
Let’s add the service that handles shopping cart orders. The template and source downloads include a directory named shopping-order-service
for this purpose. To add the order gRPC service, follow these steps:
-
Open the
shopping-order-service
in IntelliJ just as you did with the shopping-cart-service. -
Define the interface of the service in a protobuf service descriptor. It should be located in the
src/main/protobuf/shoppingorder/shopping_order_service.proto
in the newshopping-order-service
project.src/main/protobuf/shoppingorder/shopping_order_service.protosyntax = "proto3"; package shoppingorder; option java_multiple_files = true; option java_package = "shopping.order.proto"; // gRPC definition for ShoppingOrderService service ShoppingOrderService { rpc Order(OrderRequest) returns (OrderResponse) {} } message OrderRequest { string cart_id = 1; repeated Item items = 2; } message Item { string item_id = 1; int32 quantity = 2; } message OrderResponse { bool ok = 1; }
-
Generate code by compiling the project:
mvn compile
sbt compile
-
Implement the
ShoppingOrderService
in a new classShoppingOrderServiceImpl
:- Java
-
src/main/java/shopping/order/ShoppingOrderServiceImpl.java:
package shopping.order; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.order.proto.Item; import shopping.order.proto.OrderRequest; import shopping.order.proto.OrderResponse; import shopping.order.proto.ShoppingOrderService; public final class ShoppingOrderServiceImpl implements ShoppingOrderService { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public CompletionStage<OrderResponse> order(OrderRequest in) { int total = 0; for (Item item : in.getItemsList()) { total += item.getQuantity(); } logger.info("Order {} items from cart {}.", total, in.getCartId()); OrderResponse response = OrderResponse.newBuilder().setOk(true).build(); return CompletableFuture.completedFuture(response); } }
- Scala
-
src/main/scala/shopping/order/ShoppingOrderServiceImpl.scala:
package shopping.order import org.slf4j.LoggerFactory import shopping.order.proto.{ OrderRequest, OrderResponse } import scala.concurrent.Future class ShoppingOrderServiceImpl extends proto.ShoppingOrderService { private val logger = LoggerFactory.getLogger(getClass) override def order(in: OrderRequest): Future[OrderResponse] = { val totalNumberOfItems = in.items.iterator.map(_.quantity).sum logger.info("Order {} items from cart {}.", totalNumberOfItems, in.cartId) Future.successful(OrderResponse(ok = true)) } }
-
Similar to the gRPC server for the ShoppingCartService we need to initialize the gRPC server. Add a
ShoppingOrderServer
object class:- Java
-
src/main/java/shopping/order/ShoppingOrderServer.java:
package shopping.order; import akka.actor.typed.ActorSystem; import akka.grpc.javadsl.ServerReflection; import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collections; import java.util.concurrent.CompletionStage; import shopping.order.proto.ShoppingOrderService; import shopping.order.proto.ShoppingOrderServiceHandlerFactory; public final class ShoppingOrderServer { private ShoppingOrderServer() {} static void start( String host, int port, ActorSystem<?> system, ShoppingOrderService grpcService) { @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> service = ServiceHandler.concatOrNotFound( ShoppingOrderServiceHandlerFactory.create(grpcService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( Collections.singletonList(ShoppingOrderService.description), system)); CompletionStage<ServerBinding> bound = Http.get(system).newServerAt(host, port).bind(service::apply); bound.whenComplete( (binding, ex) -> { if (binding != null) { binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); InetSocketAddress address = binding.localAddress(); system .log() .info( "Shopping order at gRPC server {}:{}", address.getHostString(), address.getPort()); } else { system.log().error("Failed to bind gRPC endpoint, terminating system", ex); system.terminate(); } }); } }
- Scala
-
src/main/scala/shopping/order/ShoppingOrderServer.scala:
package shopping.order import akka.actor.typed.ActorSystem import akka.grpc.scaladsl.{ ServerReflection, ServiceHandler } import akka.http.scaladsl.Http import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success } object ShoppingOrderServer { def start( interface: String, port: Int, system: ActorSystem[_], grpcService: proto.ShoppingOrderService): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( proto.ShoppingOrderServiceHandler.partial(grpcService), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.partial(List(proto.ShoppingOrderService))) val bound = Http() .newServerAt(interface, port) .bind(service) .map(_.addToCoordinatedShutdown(3.seconds)) bound.onComplete { case Success(binding) => val address = binding.localAddress system.log.info( "Shopping order at gRPC server {}:{}", address.getHostString, address.getPort) case Failure(ex) => system.log.error("Failed to bind gRPC endpoint, terminating system", ex) system.terminate() } } }
-
Call the
ShoppingOrderServer.start
fromMain
:- Java
-
package shopping.order; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.management.cluster.bootstrap.ClusterBootstrap; import akka.management.javadsl.AkkaManagement; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.order.proto.ShoppingOrderService; public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) throws Exception { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ShoppingOrderService"); try { init(system); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system) { AkkaManagement.get(system).start(); ClusterBootstrap.get(system).start(); Config config = system.settings().config(); String grpcInterface = config.getString("shopping-order-service.grpc.interface"); int grpcPort = config.getInt("shopping-order-service.grpc.port"); ShoppingOrderService grpcService = new ShoppingOrderServiceImpl(); ShoppingOrderServer.start(grpcInterface, grpcPort, system, grpcService); } }
- Scala
-
package shopping.order import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import org.slf4j.LoggerFactory import scala.util.control.NonFatal object Main { val logger = LoggerFactory.getLogger("shopping.order.Main") def main(args: Array[String]): Unit = { val system = ActorSystem[Nothing](Behaviors.empty, "ShoppingOrderService") try { init(system) } catch { case NonFatal(e) => logger.error("Terminating due to initialization failure.", e) system.terminate() } } def init(system: ActorSystem[_]): Unit = { AkkaManagement(system).start() ClusterBootstrap(system).start() val grpcInterface = system.settings.config.getString("shopping-order-service.grpc.interface") val grpcPort = system.settings.config.getInt("shopping-order-service.grpc.port") val grpcService = new ShoppingOrderServiceImpl ShoppingOrderServer.start(grpcInterface, grpcPort, system, grpcService) } }
The grpc.port configuration is defined in local1.conf , which is included in the generated template project.
|
2. Create the Projection
The new Projection for shopping-cart-service
events will be similar to the one we developed to track item popularity, but when it receives ShoppingCart.CheckedOut
events, it will call the ShoppingOrderService
.
Create the Projection as follows:
-
Include the service definition by copying the
shopping_order_service.proto
file from theshopping-order-service
to theshopping-cart-service/src/main/protobuf
directory. -
Generate code by compiling the
shopping-cart-service
project:mvn compile
sbt compile
-
Add a
SendOrderProjectionHandler
class in theshopping-cart-service
project. This is the ProjectionHandler
for processing the events:- Java
-
src/main/java/shopping/cart/SendOrderProjectionHandler.java:
package shopping.cart; import static akka.Done.done; import akka.Done; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.persistence.query.typed.EventEnvelope; import akka.projection.r2dbc.javadsl.R2dbcHandler; import akka.projection.r2dbc.javadsl.R2dbcSession; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import shopping.order.proto.Item; import shopping.order.proto.OrderRequest; import shopping.order.proto.ShoppingOrderService; public final class SendOrderProjectionHandler extends R2dbcHandler<EventEnvelope<ShoppingCart.Event>> { private final Logger log = LoggerFactory.getLogger(getClass()); private final ClusterSharding sharding; private final Duration timeout; private final ShoppingOrderService orderService; private final String slice; public SendOrderProjectionHandler( ActorSystem<?> system, ShoppingOrderService orderService, String slice) { (1) this.slice = slice; sharding = ClusterSharding.get(system); timeout = system.settings().config().getDuration("shopping-cart-service.ask-timeout"); this.orderService = orderService; } @Override public CompletionStage<Done> process( R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) { return envelope.event() instanceof ShoppingCart.CheckedOut checkedOut ? sendOrder(checkedOut) : CompletableFuture.completedFuture(done()); } private CompletionStage<Done> sendOrder(ShoppingCart.CheckedOut checkout) { var entityRef = sharding.entityRefFor(ShoppingCart.ENTITY_KEY, checkout.cartId()); var reply = entityRef.ask(ShoppingCart.Get::new, timeout); return reply.thenCompose( cart -> { (2) List<Item> protoItems = cart.items().entrySet().stream() .map( entry -> Item.newBuilder() .setItemId(entry.getKey()) .setQuantity(entry.getValue()) .build()) .collect(Collectors.toList()); log.info( "Sending order of {} items for cart {}.", cart.items().size(), checkout.cartId()); var orderRequest = OrderRequest.newBuilder() .setCartId(checkout.cartId()) .addAllItems(protoItems) .build(); return orderService.order(orderRequest).thenApply(response -> done()); (3) }); } }
1 ShoppingOrderService
is the gRPC client2 Retrieve the full shopping cart information from the entity. In the order we need to include the list of items and their quantities. That information is not included in the ShoppingCart.CheckedOut
event, but we can retrieve it by asking theShoppingCart
entity for it.3 Call the ShoppingOrderService
. If the call to theShoppingOrderService
fails, the returnedCompletionStage<Done>
fails and the Projection is automatically restarted from the previously saved offset. This will result in retrying the call to theShoppingOrderService
. Since the Projection hasat-least-once
semantics, theShoppingOrderService
must be idempotent, that is, it must gracefully handle duplicate order attempts for the samecartId
. - Scala
-
src/main/scala/shopping/cart/SendOrderProjectionHandler.scala:
package shopping.cart import akka.Done import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.persistence.query.typed.EventEnvelope import akka.projection.r2dbc.scaladsl.{ R2dbcHandler, R2dbcSession } import akka.util.Timeout import org.slf4j.LoggerFactory import shopping.order.proto.{ Item, OrderRequest, ShoppingOrderService } import scala.concurrent.{ ExecutionContext, Future } class SendOrderProjectionHandler( system: ActorSystem[_], orderService: ShoppingOrderService) (1) extends R2dbcHandler[EventEnvelope[ShoppingCart.Event]] { private val log = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext private val sharding = ClusterSharding(system) implicit private val timeout: Timeout = Timeout.create( system.settings.config.getDuration("shopping-cart-service.ask-timeout")) override def process( session: R2dbcSession, envelope: EventEnvelope[ShoppingCart.Event]): Future[Done] = { envelope.event match { case checkout: ShoppingCart.CheckedOut => sendOrder(checkout) case _ => // this projection is only interested in CheckedOut events Future.successful(Done) } } private def sendOrder(checkout: ShoppingCart.CheckedOut): Future[Done] = { val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, checkout.cartId) entityRef.ask(ShoppingCart.Get).flatMap { cart => (2) val items = cart.items.iterator.map { case (itemId, quantity) => Item(itemId, quantity) }.toList log.info( "Sending order of {} items for cart {}.", items.size, checkout.cartId) val orderReq = OrderRequest(checkout.cartId, items) orderService.order(orderReq).map(_ => Done) (3) } } }
1 ShoppingOrderService
is the gRPC client2 Retrieve the full shopping cart information from the entity. In the order we need to include the list of items and their quantities. That information is not included in the ShoppingCart.CheckedOut
event, but we can retrieve it by asking theShoppingCart
entity for it.3 Call the ShoppingOrderService
. If the call to theShoppingOrderService
fails, the returnedFuture[Done]
fails and the Projection is automatically restarted from the previously saved offset. This will result in retrying the call to theShoppingOrderService
. Since the Projection hasat-least-once
semantics, theShoppingOrderService
must be idempotent, that is, it must gracefully handle duplicate order attempts for the samecartId
.
3. Initialize the Projection
We will follow a very similar approach to the one we used for the query Projection. So, we just need to initialize it as follows:
-
Place the initialization code of the Projection in an
SendOrderProjection
object: class:- Java
-
src/main/java/shopping/cart/SendOrderProjection.java:
package shopping.cart; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.ShardedDaemonProcessSettings; import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.japi.Pair; import akka.persistence.query.Offset; import akka.persistence.query.typed.EventEnvelope; import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.javadsl.ExactlyOnceProjection; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.R2dbcProjectionSettings; import akka.projection.r2dbc.javadsl.R2dbcProjection; import java.util.List; import java.util.Optional; import shopping.order.proto.ShoppingOrderService; public class SendOrderProjection { private SendOrderProjection() {} public static void init(ActorSystem<?> system, ShoppingOrderService orderService) { ShardedDaemonProcess.get(system) .initWithContext( ProjectionBehavior.Command.class, "SendOrderProjection", 4, daemonContext -> { List<Pair<Integer, Integer>> sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier(), daemonContext.totalProcesses()); Pair<Integer, Integer> sliceRange = sliceRanges.get(daemonContext.processNumber()); return ProjectionBehavior.create(createProjection(system, orderService, sliceRange)); }, ShardedDaemonProcessSettings.create(system), Optional.of(ProjectionBehavior.stopMessage())); } private static ExactlyOnceProjection<Offset, EventEnvelope<ShoppingCart.Event>> createProjection( ActorSystem<?> system, ShoppingOrderService shoppingOrderService, Pair<Integer, Integer> sliceRange) { int minSlice = sliceRange.first(); int maxSlice = sliceRange.second(); SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider = EventSourcedProvider.eventsBySlices( system, R2dbcReadJournal.Identifier(), "ShoppingCart", minSlice, maxSlice); String slice = "carts-" + minSlice + "-" + maxSlice; Optional<R2dbcProjectionSettings> settings = Optional.empty(); return R2dbcProjection.exactlyOnce( ProjectionId.of("SendOrderProjection", slice), settings, sourceProvider, () -> new SendOrderProjectionHandler(system, shoppingOrderService, slice), system); } }
- Scala
-
src/main/scala/shopping/cart/SendOrderProjection.scala:
package shopping.cart import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.SourceProvider import akka.projection.{ Projection, ProjectionBehavior, ProjectionId } import shopping.order.proto.ShoppingOrderService object SendOrderProjection { def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = { def sourceProvider(sliceRange: Range) : SourceProvider[Offset, EventEnvelope[ShoppingCart.Event]] = EventSourcedProvider.eventsBySlices[ShoppingCart.Event]( system, readJournalPluginId = R2dbcReadJournal.Identifier, "ShoppingCart", sliceRange.min, sliceRange.max) def projection( sliceRange: Range): Projection[EventEnvelope[ShoppingCart.Event]] = { val minSlice = sliceRange.min val maxSlice = sliceRange.max val projectionId = ProjectionId("SendOrderProjection", s"carts-$minSlice-$maxSlice") R2dbcProjection.atLeastOnce( projectionId, settings = None, sourceProvider(sliceRange), handler = () => new SendOrderProjectionHandler(system, orderService))( system) } ShardedDaemonProcess(system).initWithContext( name = "SendOrderProjection", initialNumberOfInstances = 4, behaviorFactory = { daemonContext => val sliceRanges = EventSourcedProvider.sliceRanges( system, R2dbcReadJournal.Identifier, daemonContext.totalProcesses) val sliceRange = sliceRanges(daemonContext.processNumber) ProjectionBehavior(projection(sliceRange)) }, ShardedDaemonProcessSettings(system), stopMessage = ProjectionBehavior.Stop) } }
-
In
Main
, invoke theSendOrderProjection.init
and create the gRPC client for theShoppingOrderService
like this:- Java
-
public class Main { public static void main(String[] args) { ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "shopping-cart-service"); try { init(system, orderServiceClient(system)); } catch (Exception e) { logger.error("Terminating due to initialization failure.", e); system.terminate(); } } public static void init(ActorSystem<Void> system, ShoppingOrderService orderService) { // ... SendOrderProjection.init(system, orderService); (1) // ... } static ShoppingOrderService orderServiceClient(ActorSystem<?> system) { (2) GrpcClientSettings orderServiceClientSettings = GrpcClientSettings.connectToServiceAt( system.settings().config().getString("shopping-order-service.host"), system.settings().config().getInt("shopping-order-service.port"), system) .withTls(false); return ShoppingOrderServiceClient.create(orderServiceClientSettings, system); } }
1 Loads host and port for where to find the service from configuration. 2 Creates the client for the ShoppingOrderService
. - Scala
-
// ... def init(system: ActorSystem[_], orderService: ShoppingOrderService): Unit = { // ... SendOrderProjection.init(system, orderService) (1) // ... } protected def orderServiceClient( (2) system: ActorSystem[_]): ShoppingOrderService = { val orderServiceClientSettings = GrpcClientSettings .connectToServiceAt( system.settings.config.getString("shopping-order-service.host"), system.settings.config.getInt("shopping-order-service.port"))(system) .withTls(false) ShoppingOrderServiceClient(orderServiceClientSettings)(system) }
1 Loads host and port for where to find the service from configuration. 2 Creates the client for the ShoppingOrderService
.
-
The gRPC client is using service discovery to locate the
ShoppingOrderService
. For local development add the following tosrc/main/resources/local-shared.conf
, which is loaded when running locally:shopping-order-service.host = "localhost" shopping-order-service.port = 8301
4. Run locally
Follow these steps to run locally and exercise the new Projection and service:
-
Start PostgreSQL, unless it is already running:
docker compose up -d
-
Run the
shopping-order-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
-
Keep the
shopping-order-service
running, and in another terminal run theshopping-cart-service
with:# make sure to compile before running exec:exec mvn compile exec:exec -DAPP_CONFIG=local1.conf
sbt -Dconfig.resource=local1.conf run
4.1. Exercise the service
Use grpcurl
to exercise the service:
-
Try the new order service directly (on port 8301):
grpcurl -d '{"cartId":"cart1", "items":[{"itemId":"socks", "quantity":3}, {"itemId":"t-shirt", "quantity":2}]}' -plaintext 127.0.0.1:8301 shoppingorder.ShoppingOrderService.Order
-
Use the checkout in the shopping cart service with
grpcurl
(note the different port number):grpcurl -d '{"cartId":"cart1", "itemId":"scissors", "quantity":1}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.AddItem
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
-
In the terminal of the
shopping-order-service
you should see the log of the order:Order 12 items from cart cart1