Local Drone Delivery Selection
In the previous step of the guide we implemented the means for the cloud service to keep track of restaurant and the queue of registered deliveries for each.
We want to replicate the registered events to each local-drone-control PoP so that the drones close to it can pick up orders and perform the deliveries.
Again we will use Akka Projection gRPC to do service-to-service events passing without requiring a message broker in between services.
We will then implement a service allowing the drones to ask the local-drone-control to assign them the closest waiting delivery.
Replication of the delivery events
First we must set up replication of the events from the restaurant-drone-deliveries-service.
The regular Akka Projection gRPC behavior is that the consumer connects to the producer, in this case the local-drone-control being the consumer connecting to the cloud.
To implement this we define an EventProducerSource
and create a gRPC request handler for it. We use a protobuf message that we transform the internal domain event DeliveryRegistered
using a Transformation
. Any other message type is filtered out and not replicated to the consumers using the orElseMapper
:
- Scala
-
source
package central.deliveries import akka.actor.typed.ActorSystem import akka.persistence.query.typed.EventEnvelope import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducer import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation import scala.concurrent.Future object DeliveryEvents { def eventProducerSource( system: ActorSystem[_]): EventProducer.EventProducerSource = { val transformation = Transformation.empty .registerAsyncEnvelopeMapper[ RestaurantDeliveries.DeliveryRegistered, proto.DeliveryRegistered](envelope => Future.successful(Some(transformDeliveryRegistration(envelope)))) // filter all other types of events for the RestaurantDeliveries .registerOrElseMapper(_ => None) val eventProducerSource = EventProducer.EventProducerSource( RestaurantDeliveries.EntityKey.name, // Note: stream id used in consumer to consume this specific stream "delivery-events", transformation, EventProducerSettings(system)) eventProducerSource } private def transformDeliveryRegistration( envelope: EventEnvelope[RestaurantDeliveries.DeliveryRegistered]) : proto.DeliveryRegistered = { val delivery = envelope.event.delivery proto.DeliveryRegistered( deliveryId = delivery.deliveryId, origin = Some(delivery.origin.toProto), destination = Some(delivery.destination.toProto)) } }
- Java
The gRPC request handler is composed with the other gRPC handlers of the service into a single bound server:
- Scala
-
source
val service = ServiceHandler.concatOrNotFound( DroneOverviewServiceHandler.partial(droneOverviewService), RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService), ChargingStationServiceHandler.partial(chargingStationService), eventPullHandler, eventPushHandler, ServerReflection.partial( List( DroneOverviewService, RestaurantDeliveriesService, ChargingStationService))) val bound = Http().newServerAt(interface, port).bind(service)
- Java
Since we expect a high number of local-drone-control edge systems connecting to the restaurant-drone-deliveries-service to consume the restaurant orders we configure the events-by-slice-firehose for the projection. The firehose tries to share the stream of events across consumers connected to the same node, instead of each consumer executing its queries in parallel, so that less load is applied to the database.
The firehose is enabled through the following configuration selecting it as query-plugin-id for the akka.projection.grpcproducer
and then configuring the actual underlying akka.persistence.r2dbc.query
as query plugin for the firehose:
- Scala
-
source
akka.projection.grpc { producer { # use the firehose for order events so that the local-drone-control consumers # shares the same firehose instead of each lead to load on the database query-plugin-id = "akka.persistence.query.events-by-slice-firehose" } } akka.persistence.query.events-by-slice-firehose { delegate-query-plugin-id = "akka.persistence.r2dbc.query" }
- Java
Delivery queue actor
The queue of all deliveries for one local-drone-control service is managed by a single durable state actor to keep things simple.
For a high throughput of deliveries, a single actor might become a congestion point and a more clever scheme, for example partitioning the deliveries into multiple queues based on the coarse grained coordinate of the restaurant, could make sense.
Commands and events
The actor accepts the command AddDelivery
to enqueue a delivery, the commands RequestDelivery
and CompleteDelivery
for drones to pick up and complete deliveries and GetCurrentState
for us to be able to inspect the current state of the queue:
- Scala
-
source
sealed trait Command extends CborSerializable final case class AddDelivery( waitingDelivery: WaitingDelivery, replyTo: ActorRef[Done]) extends Command final case class RequestDelivery( droneId: String, droneCoordinates: Coordinates, replyTo: ActorRef[StatusReply[WaitingDelivery]]) extends Command final case class CompleteDelivery( deliveryId: String, replyTo: ActorRef[StatusReply[Done]]) extends Command final case class GetCurrentState(replyTo: ActorRef[State]) extends Command
- Java
State
In the state of the actor, we keep two lists, one is the waiting queue of deliveries, and one is the currently picked up deliveries, waiting for the drone to report back once the delivery completed:
- Scala
-
source
final case class WaitingDelivery( deliveryId: String, from: Coordinates, to: Coordinates) extends CborSerializable final case class DeliveryInProgress( deliveryId: String, droneId: String, pickupTime: Instant) final case class State( waitingDeliveries: Vector[WaitingDelivery], deliveriesInProgress: Vector[DeliveryInProgress]) extends CborSerializable
- Java
Command handler
The command handler de-duplicates orders by id for AddDelivery
to avoid duplicates.
When a RequestDelivery
comes in, we first check that there are deliveries waiting, and if there are we find the one where the restaurant is closest to the current location of the drone. We then move the delivery from the waitingDeliveries
queue to the deliveriesInProgress
list, so that it is not selected again for another drone, and persist the state.
For the CompleteDelivery
command, the delivery is removed from the state and then the updated state is persisted.
- Scala
-
source
private def onCommand(context: ActorContext[Command])( state: State, command: Command): Effect[State] = command match { case AddDelivery(delivery, replyTo) => context.log.info("Adding delivery [{}] to queue", delivery.deliveryId) if (state.waitingDeliveries.contains( delivery) || state.deliveriesInProgress.exists( _.deliveryId == delivery.deliveryId)) Effect.reply(replyTo)(Done) else Effect .persist( state.copy(waitingDeliveries = state.waitingDeliveries :+ delivery)) .thenReply(replyTo)(_ => Done) case RequestDelivery(droneId, droneCoordinates, replyTo) => if (state.waitingDeliveries.isEmpty) Effect.reply(replyTo)(StatusReply.Error("No waiting orders")) else { val closestPickupForDrone = state.waitingDeliveries.minBy(delivery => droneCoordinates.distanceTo(delivery.from)) context.log.info( "Selected next delivery [{}] for drone [{}]", closestPickupForDrone.deliveryId, droneId) // Note: A real application would have to care more about retries/lost data here Effect .persist( state.copy( waitingDeliveries = state.waitingDeliveries.filterNot(_ == closestPickupForDrone), state.deliveriesInProgress :+ DeliveryInProgress( closestPickupForDrone.deliveryId, droneId, Instant.now()))) .thenReply(replyTo)(_ => StatusReply.Success(closestPickupForDrone)) } case CompleteDelivery(deliveryId, replyTo) => if (!state.deliveriesInProgress.exists(_.deliveryId == deliveryId)) { Effect.reply(replyTo)( StatusReply.Error(s"Unknown delivery id: ${deliveryId}")) } else { Effect .persist(state.copy(deliveriesInProgress = state.deliveriesInProgress.filterNot(_.deliveryId == deliveryId))) .thenReply(replyTo)(_ => StatusReply.Success(Done)) } case GetCurrentState(replyTo) => Effect.reply(replyTo)(state) }
- Java
Consuming the delivery events
To consume the stream of delivery events from the central cloud we need to set up a projection. We only want to consume the events for the location id of the particular local-drone-control service, this is done through a consumer filter first excluding all events and then selecting only the events for the configured location id:
- Scala
-
source
package local.drones import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } import akka.grpc.GrpcClientSettings import akka.persistence.Persistence import akka.persistence.query.typed.EventEnvelope import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.grpc.consumer.{ ConsumerFilter, GrpcQuerySettings } import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.Handler import akka.projection.{ ProjectionBehavior, ProjectionId } import akka.util.Timeout /** * Consume delivery events from the cloud and pass to the delivery queue actor */ object DeliveryEvents { def projectionBehavior( queueActor: ActorRef[DeliveriesQueue.Command], settings: Settings)( implicit system: ActorSystem[_]): Behavior[ProjectionBehavior.Command] = { val projectionName: String = "delivery-events" implicit val timeout: Timeout = settings.askTimeout val eventsBySlicesQuery = GrpcReadJournal( GrpcQuerySettings(system).withInitialConsumerFilter( // location id already is in the format of a topic filter expression Vector( ConsumerFilter.excludeAll, ConsumerFilter.IncludeTopics(Set(settings.locationId)))), GrpcClientSettings.fromConfig( system.settings.config .getConfig("akka.projection.grpc.consumer.client")), List(central.deliveries.proto.DeliveryEventsProto.javaDescriptor)) // single projection handling all slices val sliceRanges = Persistence(system).sliceRanges(1) val sliceRange = sliceRanges(0) val projectionKey = s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}" val projectionId = ProjectionId.of(projectionName, projectionKey) val sourceProvider = EventSourcedProvider .eventsBySlices[central.deliveries.proto.DeliveryRegistered]( system, eventsBySlicesQuery, eventsBySlicesQuery.streamId, sliceRange.min, sliceRange.max) import akka.actor.typed.scaladsl.AskPattern._ val handler: Handler[ EventEnvelope[central.deliveries.proto.DeliveryRegistered]] = { envelope => queueActor.ask( DeliveriesQueue.AddDelivery( DeliveriesQueue.WaitingDelivery( deliveryId = envelope.event.deliveryId, from = Coordinates.fromProto(envelope.event.origin.get), to = Coordinates.fromProto(envelope.event.destination.get)), _)) } ProjectionBehavior( R2dbcProjection .atLeastOnceAsync(projectionId, None, sourceProvider, () => handler)) } }
- Java
gRPC services
Drone deliveries
The method for drones to select the next delivery, and to complete it are added to the existing drone service:
- Scala
-
source
syntax = "proto3"; option java_multiple_files = true; option java_package = "local.drones.proto"; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; import "common/coordinates.proto"; package local.drones; // gRPC definition for DroneService, for drones to interact with service DroneService { rpc ReportLocation (ReportLocationRequest) returns (google.protobuf.Empty) {} // deliveries rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {} rpc CompleteDelivery (CompleteDeliveryRequest) returns (google.protobuf.Empty) {} // charging rpc GoCharge (GoChargeRequest) returns (ChargingResponse) {} rpc CompleteCharge (CompleteChargeRequest) returns (CompleteChargingResponse) {} } message ReportLocationRequest { string drone_id = 1; common.Coordinates coordinates = 2; // altitude in meters double altitude = 4; } message RequestNextDeliveryRequest { string drone_id = 1; } message RequestNextDeliveryResponse { string delivery_id = 1; common.Coordinates from = 2; common.Coordinates to = 3; } message CompleteDeliveryRequest { string delivery_id = 1; } message GoChargeRequest { string drone_id = 1; string charging_station_id = 2; } message ChargingResponse { oneof response { ChargingStarted started = 1; ComeBackLater come_back_later = 2; }; } message ChargingStarted { google.protobuf.Timestamp expected_complete = 1; } message ComeBackLater { google.protobuf.Timestamp first_slot_free_at = 1; } message CompleteChargeRequest { string charging_station_id = 1; string drone_id = 2; } message CompleteChargingResponse { }
- Java
Implementation of the generated service interface:
- Scala
-
source
package local.drones import akka.Done import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem } import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityRef import akka.grpc.GrpcServiceException import akka.util.Timeout import charging.ChargingStation import com.google.protobuf.empty.Empty import com.google.protobuf.timestamp.Timestamp import io.grpc.Status import org.slf4j.LoggerFactory import local.drones.proto import scala.concurrent.Future import scala.concurrent.TimeoutException class DroneServiceImpl( deliveriesQueue: ActorRef[DeliveriesQueue.Command], chargingStationEntityRefFactory: String => EntityRef[ ChargingStation.Command], settings: Settings)(implicit system: ActorSystem[_]) extends proto.DroneService { import system.executionContext private val logger = LoggerFactory.getLogger(getClass) implicit private val timeout: Timeout = settings.askTimeout private val sharding = ClusterSharding(system) override def reportLocation( in: proto.ReportLocationRequest): Future[Empty] = { val coordinates = in.coordinates.getOrElse { throw new GrpcServiceException( Status.INVALID_ARGUMENT.withDescription( "coordinates are required but missing")) } logger.info( "Report location ({},{},{}) for drone {}", coordinates.latitude, coordinates.longitude, in.altitude, in.droneId) val entityRef = sharding.entityRefFor(Drone.EntityKey, in.droneId) val reply: Future[Done] = entityRef.ask( Drone.ReportPosition( Position(Coordinates.fromProto(coordinates), in.altitude), _)) val response = reply.map(_ => Empty.defaultInstance) convertError(response) } // #requestNextDelivery override def requestNextDelivery(in: proto.RequestNextDeliveryRequest) : Future[proto.RequestNextDeliveryResponse] = { logger.info("Drone {} requesting next delivery", in.droneId) // get location for drone val entityRef = sharding.entityRefFor(Drone.EntityKey, in.droneId) // ask for closest delivery val response = for { position <- entityRef.askWithStatus[Position](Drone.GetCurrentPosition(_)) chosenDelivery <- deliveriesQueue .askWithStatus[DeliveriesQueue.WaitingDelivery]( DeliveriesQueue.RequestDelivery(in.droneId, position.coordinates, _)) } yield { proto.RequestNextDeliveryResponse( deliveryId = chosenDelivery.deliveryId, from = Some(chosenDelivery.from.toProto), to = Some(chosenDelivery.to.toProto)) } convertError(response) } // #requestNextDelivery override def completeDelivery( in: proto.CompleteDeliveryRequest): Future[Empty] = { logger.info("Delivery {} completed", in.deliveryId) deliveriesQueue .askWithStatus[Done](DeliveriesQueue.CompleteDelivery(in.deliveryId, _)) .map(_ => Empty.defaultInstance) } // #charging override def goCharge( in: proto.GoChargeRequest): Future[proto.ChargingResponse] = { logger.info( "Requesting charge of {} from {}", in.droneId, in.chargingStationId) val entityRef = chargingStationEntityRefFactory(in.chargingStationId) val response = entityRef .askWithStatus[ChargingStation.StartChargingResponse]( ChargingStation.StartCharging(in.droneId, _)) .map { case ChargingStation.ChargingStarted(_, expectedComplete) => proto.ChargingResponse( proto.ChargingResponse.Response.Started( proto.ChargingStarted(Some(Timestamp(expectedComplete))))) case ChargingStation.AllSlotsBusy(comeBackAt) => proto.ChargingResponse( proto.ChargingResponse.Response .ComeBackLater(proto.ComeBackLater(Some(Timestamp(comeBackAt))))) } convertError(response) } override def completeCharge(in: proto.CompleteChargeRequest) : Future[proto.CompleteChargingResponse] = { logger.info( "Requesting complete charging of {} from {}", in.droneId, in.chargingStationId) val entityRef = chargingStationEntityRefFactory(in.chargingStationId) val response = entityRef .askWithStatus(ChargingStation.CompleteCharging(in.droneId, _)) .map(_ => proto.CompleteChargingResponse.defaultInstance) convertError(response) } // #charging private def convertError[T](response: Future[T]): Future[T] = { response.recoverWith { case _: TimeoutException => Future.failed( new GrpcServiceException( Status.UNAVAILABLE.withDescription("Operation timed out"))) case exc => Future.failed( new GrpcServiceException( Status.INTERNAL.withDescription(exc.getMessage))) } } }
- Java
Inspecting the queue
We add a new gRPC service for inspecting the current state of the queue:
- Scala
-
source
syntax = "proto3"; option java_multiple_files = true; option java_package = "local.drones.proto"; import "google/protobuf/empty.proto"; import "common/coordinates.proto"; package local.drones; // gRPC definition for DroneService, for drones to interact with service DeliveriesQueueService { rpc GetCurrentQueue (google.protobuf.Empty) returns (GetCurrentQueueResponse) {} } message GetCurrentQueueResponse { repeated WaitingDelivery waitingDeliveries = 1; repeated DeliveryInProgress deliveriesInProgress = 2; } message WaitingDelivery { string delivery_id = 1; common.Coordinates from = 2; common.Coordinates to = 3; } message DeliveryInProgress { string delivery_id = 1; string drone_id = 2; }
- Java
Implementation of the generated service interface:
- Scala
-
source
package local.drones import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ ActorRef, ActorSystem } import akka.util.Timeout import com.google.protobuf.empty.Empty import local.drones.proto.DeliveriesQueueService import scala.concurrent.Future class DeliveriesQueueServiceImpl( settings: Settings, deliveriesQueue: ActorRef[DeliveriesQueue.Command])( implicit system: ActorSystem[_]) extends DeliveriesQueueService { import system.executionContext private implicit val timeout: Timeout = settings.askTimeout override def getCurrentQueue( in: Empty): Future[proto.GetCurrentQueueResponse] = { val reply = deliveriesQueue.ask(DeliveriesQueue.GetCurrentState.apply) reply.map { state => proto.GetCurrentQueueResponse( waitingDeliveries = state.waitingDeliveries.map(waiting => proto.WaitingDelivery( deliveryId = waiting.deliveryId, from = Some(waiting.from.toProto), to = Some(waiting.to.toProto))), deliveriesInProgress = state.deliveriesInProgress.map(inProgress => proto.DeliveryInProgress( deliveryId = inProgress.deliveryId, droneId = inProgress.droneId))) } } }
- Java
Finally, we need to start the gRPC server with the two services:
- Scala
-
source
val service = ServiceHandler.concatOrNotFound( DroneOverviewServiceHandler.partial(droneOverviewService), RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService), ChargingStationServiceHandler.partial(chargingStationService), eventPullHandler, eventPushHandler, ServerReflection.partial( List( DroneOverviewService, RestaurantDeliveriesService, ChargingStationService))) val bound = Http().newServerAt(interface, port).bind(service)
- Java
Running the sample
The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide:
- Scala drone-scala.zip
- Java drone-java.zip
As this service consumes events from the service built in the previous step, start the local-drone-control service first:
To start the local-drone-control-service:
sbt run
Then start the drone-restaurant-deliveries-service.
As the service needs a PostgreSQL instance running, start that up in a docker container and create the database schema if you did not do that in a previous step of the guide:
docker compose up --wait
docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql
Then start the service:
sbt -Dconfig.resource=local1.conf run
And optionally one or two more Akka cluster nodes, but note that the local drone controls are statically configured to the gRPC port of the first and will only publish events to that node.
sbt -Dconfig.resource=local2.conf run
sbt -Dconfig.resource=local3.conf run
Create a restaurant with grpcurl:
grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant
Set up another restaurant, closest to a different local drone control
grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant
Register a delivery for the first restaurant
grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery
Register a delivery for the second restaurant
grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery
Now update one or more drones a few times with grpcurl against the local-drone-control:
grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.08125, "latitude": 59.41834}, "altitude": 10}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
grpcurl -d '{"drone_id":"drone2", "coordinates": {"longitude": 18.08114, "latitude": 59.42122}, "altitude": 8 }' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
Request a delivery for drone1
grpcurl -d '{"drone_id":"drone1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.RequestNextDelivery
Mark the delivery as completed
grpcurl -d '{"delivery_id":"order1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.CompleteDelivery
Inspect the current state of the local delivery queue
grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue
What’s next?
- Packaging up the two services for deployment