Local Drone Delivery Selection

In the previous step of the guide we implemented the means for the cloud service to keep track of restaurant and the queue of registered deliveries for each.

Diagram showing delivery replication to the local drone control services

We want to replicate the registered events to each local-drone-control PoP so that the drones close to it can pick up orders and perform the deliveries.

Again we will use Akka Projection gRPC to do service-to-service events passing without requiring a message broker in between services.

We will then implement a service allowing the drones to ask the local-drone-control to assign them the closest waiting delivery.

Replication of the delivery events

First we must set up replication of the events from the restaurant-drone-deliveries-service.

The regular Akka Projection gRPC behavior is that the consumer connects to the producer, in this case the local-drone-control being the consumer connecting to the cloud.

To implement this we define an EventProducerSource and create a gRPC request handler for it. We use a protobuf message that we transform the internal domain event DeliveryRegistered using a Transformation. Any other message type is filtered out and not replicated to the consumers using the orElseMapper:

Scala
sourcepackage central.deliveries

import akka.actor.typed.ActorSystem
import akka.persistence.query.typed.EventEnvelope
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation

import scala.concurrent.Future

object DeliveryEvents {

  def eventProducerSource(
      system: ActorSystem[_]): EventProducer.EventProducerSource = {
    val transformation = Transformation.empty
      .registerAsyncEnvelopeMapper[
        RestaurantDeliveries.DeliveryRegistered,
        proto.DeliveryRegistered](envelope =>
        Future.successful(Some(transformDeliveryRegistration(envelope))))
      // filter all other types of events for the RestaurantDeliveries
      .registerOrElseMapper(_ => None)

    val eventProducerSource = EventProducer.EventProducerSource(
      RestaurantDeliveries.EntityKey.name,
      // Note: stream id used in consumer to consume this specific stream
      "delivery-events",
      transformation,
      EventProducerSettings(system))

    eventProducerSource
  }

  private def transformDeliveryRegistration(
      envelope: EventEnvelope[RestaurantDeliveries.DeliveryRegistered])
      : proto.DeliveryRegistered = {
    val delivery = envelope.event.delivery
    proto.DeliveryRegistered(
      deliveryId = delivery.deliveryId,
      origin = Some(delivery.origin.toProto),
      destination = Some(delivery.destination.toProto))
  }

}
Java
sourcepackage central.deliveries;

import akka.actor.typed.ActorSystem;
import akka.persistence.query.typed.EventEnvelope;
import akka.projection.grpc.producer.EventProducerSettings;
import akka.projection.grpc.producer.javadsl.EventProducerSource;
import akka.projection.grpc.producer.javadsl.Transformation;
import central.deliveries.proto.DeliveryRegistered;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public final class DeliveryEvents {

  // Note: stream id used in consumer to consume this specific stream
  public static final String STREAM_ID = "delivery-events";

  public static EventProducerSource eventProducerSource(ActorSystem<?> system) {
    var transformation =
        Transformation.empty()
            .registerAsyncEnvelopeMapper(
                RestaurantDeliveries.DeliveryRegistered.class,
                DeliveryEvents::transformDeliveryRegistration)
            // exclude all other types of events for the RestaurantDeliveries
            .registerOrElseMapper(envelope -> Optional.empty());

    return new EventProducerSource(
        RestaurantDeliveries.ENTITY_KEY.name(),
        STREAM_ID,
        transformation,
        EventProducerSettings.create(system));
  }

  private static CompletionStage<Optional<DeliveryRegistered>> transformDeliveryRegistration(
      EventEnvelope<RestaurantDeliveries.DeliveryRegistered> envelope) {
    var delivery = envelope.event().delivery;
    return CompletableFuture.completedFuture(
        Optional.of(
            central.deliveries.proto.DeliveryRegistered.newBuilder()
                .setDeliveryId(delivery.deliveryId)
                .setOrigin(delivery.origin.toProto())
                .setDestination(delivery.destination.toProto())
                .build()));
  }
}

The gRPC request handler is composed with the other gRPC handlers of the service into a single bound server:

Scala
sourceval service = ServiceHandler.concatOrNotFound(
  DroneOverviewServiceHandler.partial(droneOverviewService),
  RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService),
  ChargingStationServiceHandler.partial(chargingStationService),
  eventPullHandler,
  eventPushHandler,
  ServerReflection.partial(
    List(
      DroneOverviewService,
      RestaurantDeliveriesService,
      ChargingStationService)))

val bound = Http(system).newServerAt(interface, port).bind(service)
Java
source@SuppressWarnings("unchecked")
var service =
    ServiceHandler.concatOrNotFound(
        DroneOverviewServiceHandlerFactory.create(droneOverviewService, system),
        RestaurantDeliveriesServiceHandlerFactory.create(restaurantDeliveriesService, system),
        ChargingStationServiceHandlerFactory.create(chargingStationService, system),
        eventPullHandler,
        eventPushHandler,
        ServerReflection.create(
            List.of(
                DroneOverviewService.description,
                RestaurantDeliveriesService.description,
                ChargingStationService.description),
            system));

var bound = Http.get(system).newServerAt(host, port).bind(service);

Since we expect a high number of local-drone-control edge systems connecting to the restaurant-drone-deliveries-service to consume the restaurant orders we configure the events-by-slice-firehose for the projection. The firehose tries to share the stream of events across consumers connected to the same node, instead of each consumer executing its queries in parallel, so that less load is applied to the database.

The firehose is enabled through the following configuration selecting it as query-plugin-id for the akka.projection.grpcproducer and then configuring the actual underlying akka.persistence.r2dbc.query as query plugin for the firehose:

Scala
sourceakka.projection.grpc {
  producer {
    # use the firehose for order events so that the local-drone-control consumers
    # shares the same firehose instead of each lead to load on the database
    query-plugin-id = "akka.persistence.query.events-by-slice-firehose"
  }
}

akka.persistence.query.events-by-slice-firehose {
  delegate-query-plugin-id = "akka.persistence.r2dbc.query"
}
Java
sourceakka.projection.grpc {
  producer {
    # use the firehose for order events so that the local-drone-control consumers
    # shares the same firehose instead of each lead to load on the database
    query-plugin-id = "akka.persistence.query.events-by-slice-firehose"
  }
}

akka.persistence.query.events-by-slice-firehose {
  delegate-query-plugin-id = "akka.persistence.r2dbc.query"
}

Delivery queue actor

The queue of all deliveries for one local-drone-control service is managed by a single durable state actor to keep things simple.

For a high throughput of deliveries, a single actor might become a congestion point and a more clever scheme, for example partitioning the deliveries into multiple queues based on the coarse grained coordinate of the restaurant, could make sense.

Commands and events

The actor accepts the command AddDelivery to enqueue a delivery, the commands RequestDelivery and CompleteDelivery for drones to pick up and complete deliveries and GetCurrentState for us to be able to inspect the current state of the queue:

Scala
sourcesealed trait Command extends CborSerializable

final case class AddDelivery(
    waitingDelivery: WaitingDelivery,
    replyTo: ActorRef[Done])
    extends Command

final case class RequestDelivery(
    droneId: String,
    droneCoordinates: Coordinates,
    replyTo: ActorRef[StatusReply[WaitingDelivery]])
    extends Command

final case class CompleteDelivery(
    deliveryId: String,
    replyTo: ActorRef[StatusReply[Done]])
    extends Command

final case class GetCurrentState(replyTo: ActorRef[State]) extends Command
Java
sourcepublic interface Command extends CborSerializable {}

public static final class AddDelivery implements Command {
  public final WaitingDelivery delivery;
  public final ActorRef<Done> replyTo;

  public AddDelivery(WaitingDelivery delivery, ActorRef<Done> replyTo) {
    this.delivery = delivery;
    this.replyTo = replyTo;
  }
}

public static final class RequestDelivery implements Command {
  public final String droneId;
  public final Coordinates droneCoordinates;
  public final ActorRef<StatusReply<WaitingDelivery>> replyTo;

  public RequestDelivery(
      String droneId,
      Coordinates droneCoordinates,
      ActorRef<StatusReply<WaitingDelivery>> replyTo) {
    this.droneId = droneId;
    this.droneCoordinates = droneCoordinates;
    this.replyTo = replyTo;
  }
}

public static final class CompleteDelivery implements Command {
  public final String deliveryId;
  public final ActorRef<StatusReply<Done>> replyTo;

  public CompleteDelivery(String deliveryId, ActorRef<StatusReply<Done>> replyTo) {
    this.deliveryId = deliveryId;
    this.replyTo = replyTo;
  }
}

public static final class GetCurrentState implements Command {
  public final ActorRef<State> replyTo;

  @JsonCreator
  public GetCurrentState(ActorRef<State> replyTo) {
    this.replyTo = replyTo;
  }
}

State

In the state of the actor, we keep two lists, one is the waiting queue of deliveries, and one is the currently picked up deliveries, waiting for the drone to report back once the delivery completed:

Scala
sourcefinal case class WaitingDelivery(
    deliveryId: String,
    from: Coordinates,
    to: Coordinates)
    extends CborSerializable

final case class DeliveryInProgress(
    deliveryId: String,
    droneId: String,
    pickupTime: Instant)

final case class State(
    waitingDeliveries: Vector[WaitingDelivery],
    deliveriesInProgress: Vector[DeliveryInProgress])
    extends CborSerializable
Java
sourcepublic static final class State implements CborSerializable {
  public final List<WaitingDelivery> waitingDeliveries;
  public final List<DeliveryInProgress> deliveriesInProgress;

  public State() {
    waitingDeliveries = new ArrayList<>();
    deliveriesInProgress = new ArrayList<>();
  }

  public State(
      List<WaitingDelivery> waitingDeliveries, List<DeliveryInProgress> deliveriesInProgress) {
    this.waitingDeliveries = waitingDeliveries;
    this.deliveriesInProgress = deliveriesInProgress;
  }
}

public static final class WaitingDelivery implements CborSerializable {
  public final String deliveryId;
  public final Coordinates from;
  public final Coordinates to;

  public WaitingDelivery(String deliveryId, Coordinates from, Coordinates to) {
    this.deliveryId = deliveryId;
    this.from = from;
    this.to = to;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    WaitingDelivery that = (WaitingDelivery) o;

    if (!deliveryId.equals(that.deliveryId)) return false;
    if (!from.equals(that.from)) return false;
    return to.equals(that.to);
  }

  @Override
  public int hashCode() {
    int result = deliveryId.hashCode();
    result = 31 * result + from.hashCode();
    result = 31 * result + to.hashCode();
    return result;
  }

  @Override
  public String toString() {
    return "WaitingDelivery{"
        + "deliveryId='"
        + deliveryId
        + '\''
        + ", from="
        + from
        + ", to="
        + to
        + '}';
  }
}

public static final class DeliveryInProgress {
  public final String deliveryId;
  public final String droneId;
  public final Instant pickupTime;

  public DeliveryInProgress(String deliveryId, String droneId, Instant pickupTime) {
    this.deliveryId = deliveryId;
    this.droneId = droneId;
    this.pickupTime = pickupTime;
  }
}

Command handler

The command handler de-duplicates orders by id for AddDelivery to avoid duplicates.

When a RequestDelivery comes in, we first check that there are deliveries waiting, and if there are we find the one where the restaurant is closest to the current location of the drone. We then move the delivery from the waitingDeliveries queue to the deliveriesInProgress list, so that it is not selected again for another drone, and persist the state.

For the CompleteDelivery command, the delivery is removed from the state and then the updated state is persisted.

Scala
sourceprivate def onCommand(context: ActorContext[Command])(
    state: State,
    command: Command): Effect[State] =
  command match {
    case AddDelivery(delivery, replyTo) =>
      context.log.info("Adding delivery [{}] to queue", delivery.deliveryId)
      if (state.waitingDeliveries.contains(
          delivery) || state.deliveriesInProgress.exists(
          _.deliveryId == delivery.deliveryId))
        Effect.reply(replyTo)(Done)
      else
        Effect
          .persist(
            state.copy(waitingDeliveries =
              state.waitingDeliveries :+ delivery))
          .thenReply(replyTo)(_ => Done)

    case RequestDelivery(droneId, droneCoordinates, replyTo) =>
      if (state.waitingDeliveries.isEmpty)
        Effect.reply(replyTo)(StatusReply.Error("No waiting orders"))
      else {
        val closestPickupForDrone = state.waitingDeliveries.minBy(delivery =>
          droneCoordinates.distanceTo(delivery.from))
        context.log.info(
          "Selected next delivery [{}] for drone [{}]",
          closestPickupForDrone.deliveryId,
          droneId)
        // Note: A real application would have to care more about retries/lost data here
        Effect
          .persist(
            state.copy(
              waitingDeliveries =
                state.waitingDeliveries.filterNot(_ == closestPickupForDrone),
              state.deliveriesInProgress :+ DeliveryInProgress(
                closestPickupForDrone.deliveryId,
                droneId,
                Instant.now())))
          .thenReply(replyTo)(_ => StatusReply.Success(closestPickupForDrone))
      }

    case CompleteDelivery(deliveryId, replyTo) =>
      if (!state.deliveriesInProgress.exists(_.deliveryId == deliveryId)) {
        Effect.reply(replyTo)(
          StatusReply.Error(s"Unknown delivery id: ${deliveryId}"))
      } else {
        Effect
          .persist(state.copy(deliveriesInProgress =
            state.deliveriesInProgress.filterNot(_.deliveryId == deliveryId)))
          .thenReply(replyTo)(_ => StatusReply.Success(Done))
      }

    case GetCurrentState(replyTo) =>
      Effect.reply(replyTo)(state)
  }
Java
source@Override
public CommandHandler<Command, State> commandHandler() {
  return newCommandHandlerBuilder()
      .forAnyState()
      .onCommand(AddDelivery.class, this::onAddDelivery)
      .onCommand(RequestDelivery.class, this::onRequestDelivery)
      .onCommand(CompleteDelivery.class, this::onCompleteDelivery)
      .onCommand(GetCurrentState.class, this::onGetCurrentState)
      .build();
}

private Effect<State> onAddDelivery(State state, AddDelivery command) {
  context.getLog().info("Adding delivery [{}] to queue", command.delivery.deliveryId);
  if (state.waitingDeliveries.contains(command.delivery)
      || state.deliveriesInProgress.stream()
          .anyMatch(
              deliveryInProgress ->
                  deliveryInProgress.deliveryId.equals(command.delivery.deliveryId)))
    return Effect().reply(command.replyTo, done());
  else {
    state.waitingDeliveries.add(command.delivery);
    return Effect().persist(state).thenReply(command.replyTo, updatedState -> done());
  }
}

private Effect<State> onRequestDelivery(State state, RequestDelivery command) {
  if (state.waitingDeliveries.isEmpty()) {
    return Effect().reply(command.replyTo, StatusReply.error("No waiting orders"));
  } else {
    var closestPickupForDrone =
        state.waitingDeliveries.stream()
            .min(
                Comparator.comparingLong(
                    delivery -> command.droneCoordinates.distanceTo(delivery.from)))
            .get();
    context
        .getLog()
        .info(
            "Selected next delivery [{}] for drone [{}]",
            closestPickupForDrone.deliveryId,
            command.droneId);
    // Note: A real application would have to care more about retries/lost data here
    state.waitingDeliveries.remove(closestPickupForDrone);
    state.deliveriesInProgress.add(
        new DeliveryInProgress(closestPickupForDrone.deliveryId, command.droneId, Instant.now()));
    return Effect()
        .persist(state)
        .thenReply(command.replyTo, updatedState -> StatusReply.success(closestPickupForDrone));
  }
}

private Effect<State> onCompleteDelivery(State state, CompleteDelivery command) {
  var maybeExisting =
      state.deliveriesInProgress.stream()
          .filter(delivery -> delivery.deliveryId.equals(command.deliveryId))
          .findFirst();
  if (maybeExisting.isEmpty()) {
    return Effect()
        .reply(command.replyTo, StatusReply.error("Unknown delivery id: " + command.deliveryId));
  } else {
    var existing = maybeExisting.get();
    state.deliveriesInProgress.remove(existing);

    return Effect()
        .persist(state)
        .thenReply(command.replyTo, updatedState -> StatusReply.success(done()));
  }
}

private Effect<State> onGetCurrentState(State state, GetCurrentState command) {
  // defensive copy since state is mutable (individual values in the lists are immutable)
  var stateToShare =
      new State(
          new ArrayList<>(state.waitingDeliveries), new ArrayList<>(state.deliveriesInProgress));
  return Effect().reply(command.replyTo, stateToShare);
}

Consuming the delivery events

To consume the stream of delivery events from the central cloud we need to set up a projection. We only want to consume the events for the location id of the particular local-drone-control service, this is done through a consumer filter first excluding all events and then selecting only the events for the configured location id:

Scala
sourcepackage local.drones

import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.grpc.GrpcClientSettings
import akka.persistence.Persistence
import akka.persistence.query.typed.EventEnvelope
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.{ ConsumerFilter, GrpcQuerySettings }
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.scaladsl.Handler
import akka.projection.{ ProjectionBehavior, ProjectionId }
import akka.util.Timeout

/**
 * Consume delivery events from the cloud and pass to the delivery queue actor
 */
object DeliveryEvents {

  def projectionBehavior(
      queueActor: ActorRef[DeliveriesQueue.Command],
      settings: Settings)(
      implicit system: ActorSystem[_]): Behavior[ProjectionBehavior.Command] = {
    val projectionName: String = "delivery-events"

    implicit val timeout: Timeout = settings.askTimeout

    val eventsBySlicesQuery =
      GrpcReadJournal(
        GrpcQuerySettings(system).withInitialConsumerFilter(
          // location id already is in the format of a topic filter expression
          Vector(
            ConsumerFilter.excludeAll,
            ConsumerFilter.IncludeTopics(Set(settings.locationId)))),
        GrpcClientSettings.fromConfig(
          system.settings.config
            .getConfig("akka.projection.grpc.consumer.client")),
        List(central.deliveries.proto.DeliveryEventsProto.javaDescriptor))

    // single projection handling all slices
    val sliceRanges =
      Persistence(system).sliceRanges(1)
    val sliceRange = sliceRanges(0)
    val projectionKey =
      s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}"
    val projectionId = ProjectionId.of(projectionName, projectionKey)

    val sourceProvider = EventSourcedProvider
      .eventsBySlices[central.deliveries.proto.DeliveryRegistered](
        system,
        eventsBySlicesQuery,
        eventsBySlicesQuery.streamId,
        sliceRange.min,
        sliceRange.max)

    import akka.actor.typed.scaladsl.AskPattern._
    val handler: Handler[
      EventEnvelope[central.deliveries.proto.DeliveryRegistered]] = {
      envelope =>
        queueActor.ask(
          DeliveriesQueue.AddDelivery(
            DeliveriesQueue.WaitingDelivery(
              deliveryId = envelope.event.deliveryId,
              from = Coordinates.fromProto(envelope.event.origin.get),
              to = Coordinates.fromProto(envelope.event.destination.get)),
            _))
    }

    ProjectionBehavior(
      R2dbcProjection
        .atLeastOnceAsync(projectionId, None, sourceProvider, () => handler))

  }

}
Java
sourcepackage local.drones;

import static akka.actor.typed.javadsl.AskPattern.*;

import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.grpc.GrpcClientSettings;
import akka.persistence.Persistence;
import akka.persistence.query.typed.EventEnvelope;
import akka.projection.ProjectionBehavior;
import akka.projection.ProjectionId;
import akka.projection.eventsourced.javadsl.EventSourcedProvider;
import akka.projection.grpc.consumer.ConsumerFilter;
import akka.projection.grpc.consumer.GrpcQuerySettings;
import akka.projection.grpc.consumer.javadsl.GrpcReadJournal;
import akka.projection.javadsl.Handler;
import akka.projection.r2dbc.javadsl.R2dbcProjection;
import central.deliveries.proto.DeliveryRegistered;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;

/** Consume delivery events from the cloud and pass to the delivery queue actor */
public class DeliveryEvents {

  public static Behavior<ProjectionBehavior.Command> projectionBehavior(
      ActorSystem<?> system, ActorRef<DeliveriesQueue.Command> queueActor, Settings settings) {
    var projectionName = "delivery-events";

    var eventsBySlicesQuery =
        GrpcReadJournal.create(
            system,
            GrpcQuerySettings.create(system)
                .withInitialConsumerFilter(
                    // location id already is in the format of a topic filter expression
                    Arrays.asList(
                        ConsumerFilter.excludeAll(),
                        new ConsumerFilter.IncludeTopics(
                            Collections.singleton(settings.locationId)))),
            GrpcClientSettings.fromConfig(
                system.settings().config().getConfig("akka.projection.grpc.consumer.client"),
                system),
            Arrays.asList(central.deliveries.proto.DeliveryEvents.getDescriptor()));

    // single projection handling all slices
    var sliceRanges = Persistence.get(system).getSliceRanges(1);
    var sliceRange = sliceRanges.get(0);
    var projectionKey =
        eventsBySlicesQuery.streamId() + "-" + sliceRange.first() + "-" + sliceRange.second();

    var projectionId = ProjectionId.of(projectionName, projectionKey);

    var sourceProvider =
        EventSourcedProvider.<central.deliveries.proto.DeliveryRegistered>eventsBySlices(
            system,
            eventsBySlicesQuery,
            eventsBySlicesQuery.streamId(),
            sliceRange.first(),
            sliceRange.second());

    var handler =
        Handler.fromFunction(
            (EventEnvelope<DeliveryRegistered> envelope) ->
                ask(
                    queueActor,
                    (ActorRef<Done> replyTo) ->
                        new DeliveriesQueue.AddDelivery(
                            new DeliveriesQueue.WaitingDelivery(
                                envelope.event().getDeliveryId(),
                                Coordinates.fromProto(envelope.event().getOrigin()),
                                Coordinates.fromProto(envelope.event().getDestination())),
                            replyTo),
                    settings.askTimeout,
                    system.scheduler()));

    return ProjectionBehavior.create(
        R2dbcProjection.atLeastOnceAsync(
            projectionId, Optional.empty(), sourceProvider, () -> handler, system));
  }
}

gRPC services

Drone deliveries

The method for drones to select the next delivery, and to complete it are added to the existing drone service:

Scala
sourcesyntax = "proto3";

option java_multiple_files = true;
option java_package = "local.drones.proto";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "common/coordinates.proto";

package local.drones;

// gRPC definition for DroneService, for drones to interact with

service DroneService {
    rpc ReportLocation (ReportLocationRequest) returns (google.protobuf.Empty) {}

    // deliveries
    rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {}
    rpc CompleteDelivery (CompleteDeliveryRequest) returns (google.protobuf.Empty) {}

    // charging
    rpc GoCharge (GoChargeRequest) returns (ChargingResponse) {}
    rpc CompleteCharge (CompleteChargeRequest) returns (CompleteChargingResponse) {}
}


message ReportLocationRequest {
    string drone_id = 1;
    common.Coordinates coordinates = 2;
    // altitude in meters
    double altitude = 4;
}

message RequestNextDeliveryRequest {
    string drone_id = 1;
}

message RequestNextDeliveryResponse {
    string delivery_id = 1;
    common.Coordinates from = 2;
    common.Coordinates to = 3;
}

message CompleteDeliveryRequest {
    string delivery_id = 1;
}

message GoChargeRequest {
    string drone_id = 1;
    string charging_station_id = 2;
}

message ChargingResponse {
    oneof response {
        ChargingStarted started = 1;
        ComeBackLater come_back_later = 2;
    };
}

message ChargingStarted {
    google.protobuf.Timestamp expected_complete = 1;
}

message ComeBackLater {
    google.protobuf.Timestamp first_slot_free_at = 1;
}

message CompleteChargeRequest {
    string charging_station_id = 1;
    string drone_id = 2;
}

message CompleteChargingResponse {
}
Java
sourcesyntax = "proto3";

option java_multiple_files = true;
option java_package = "local.drones.proto";

import "common/coordinates.proto";
import "google/protobuf/timestamp.proto";

package local.drones;

// gRPC definition for DroneService, for drones to interact with

service DroneService {
    rpc ReportLocation (ReportLocationRequest) returns (ReportLocationResponse) {}

    // deliveries
    rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {}
    rpc CompleteDelivery (CompleteDeliveryRequest) returns (CompleteDeliveryResponse) {}

    // charging
    rpc GoCharge (GoChargeRequest) returns (ChargingResponse) {}
    rpc CompleteCharge (CompleteChargeRequest) returns (CompleteChargingResponse) {}
}


message ReportLocationRequest {
    string drone_id = 1;
    common.Coordinates coordinates = 2;
    // altitude in meters
    double altitude = 4;
}

message ReportLocationResponse {
}

message RequestNextDeliveryRequest {
    string drone_id = 1;
}

message RequestNextDeliveryResponse {
    string delivery_id = 1;
    common.Coordinates from = 2;
    common.Coordinates to = 3;
}

message CompleteDeliveryRequest {
    string delivery_id = 1;
}

message CompleteDeliveryResponse {
}

message GoChargeRequest {
    string drone_id = 1;
    string charging_station_id = 2;
}

message ChargingResponse {
    oneof response {
        ChargingStarted started = 1;
        ComeBackLater come_back_later = 2;
    };
}

message ChargingStarted {
    google.protobuf.Timestamp expected_complete = 1;
}

message ComeBackLater {
    google.protobuf.Timestamp first_slot_free_at = 1;
}

message CompleteChargeRequest {
    string charging_station_id = 1;
    string drone_id = 2;
}

message CompleteChargingResponse {
}

Implementation of the generated service interface:

Scala
sourcepackage local.drones

import akka.Done
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.EntityRef
import akka.grpc.GrpcServiceException
import akka.util.Timeout
import charging.ChargingStation
import com.google.protobuf.empty.Empty
import com.google.protobuf.timestamp.Timestamp
import io.grpc.Status
import org.slf4j.LoggerFactory
import local.drones.proto

import scala.concurrent.Future
import scala.concurrent.TimeoutException

class DroneServiceImpl(
    deliveriesQueue: ActorRef[DeliveriesQueue.Command],
    chargingStationEntityRefFactory: String => EntityRef[
      ChargingStation.Command],
    settings: Settings)(implicit system: ActorSystem[_])
    extends proto.DroneService {

  import system.executionContext

  private val logger = LoggerFactory.getLogger(getClass)

  implicit private val timeout: Timeout = settings.askTimeout

  private val sharding = ClusterSharding(system)

  override def reportLocation(
      in: proto.ReportLocationRequest): Future[Empty] = {
    val coordinates = in.coordinates.getOrElse {
      throw new GrpcServiceException(
        Status.INVALID_ARGUMENT.withDescription(
          "coordinates are required but missing"))
    }
    logger.info(
      "Report location ({},{},{}) for drone {}",
      coordinates.latitude,
      coordinates.longitude,
      in.altitude,
      in.droneId)
    val entityRef = sharding.entityRefFor(Drone.EntityKey, in.droneId)
    val reply: Future[Done] = entityRef.ask(
      Drone.ReportPosition(
        Position(Coordinates.fromProto(coordinates), in.altitude),
        _))
    val response = reply.map(_ => Empty.defaultInstance)
    convertError(response)
  }

  // #requestNextDelivery
  override def requestNextDelivery(in: proto.RequestNextDeliveryRequest)
      : Future[proto.RequestNextDeliveryResponse] = {
    logger.info("Drone {} requesting next delivery", in.droneId)

    // get location for drone
    val entityRef = sharding.entityRefFor(Drone.EntityKey, in.droneId)

    // ask for closest delivery
    val response = for {
      position <- entityRef.askWithStatus[Position](Drone.GetCurrentPosition(_))
      chosenDelivery <- deliveriesQueue
        .askWithStatus[DeliveriesQueue.WaitingDelivery](
          DeliveriesQueue.RequestDelivery(in.droneId, position.coordinates, _))
    } yield {
      proto.RequestNextDeliveryResponse(
        deliveryId = chosenDelivery.deliveryId,
        from = Some(chosenDelivery.from.toProto),
        to = Some(chosenDelivery.to.toProto))
    }

    convertError(response)
  }
  // #requestNextDelivery
  override def completeDelivery(
      in: proto.CompleteDeliveryRequest): Future[Empty] = {
    logger.info("Delivery {} completed", in.deliveryId)

    deliveriesQueue
      .askWithStatus[Done](DeliveriesQueue.CompleteDelivery(in.deliveryId, _))
      .map(_ => Empty.defaultInstance)
  }

  // #charging
  override def goCharge(
      in: proto.GoChargeRequest): Future[proto.ChargingResponse] = {
    logger.info(
      "Requesting charge of {} from {}",
      in.droneId,
      in.chargingStationId)
    val entityRef = chargingStationEntityRefFactory(in.chargingStationId)
    val response = entityRef
      .askWithStatus[ChargingStation.StartChargingResponse](
        ChargingStation.StartCharging(in.droneId, _))
      .map {
        case ChargingStation.ChargingStarted(_, expectedComplete) =>
          proto.ChargingResponse(
            proto.ChargingResponse.Response.Started(
              proto.ChargingStarted(Some(Timestamp(expectedComplete)))))
        case ChargingStation.AllSlotsBusy(comeBackAt) =>
          proto.ChargingResponse(
            proto.ChargingResponse.Response
              .ComeBackLater(proto.ComeBackLater(Some(Timestamp(comeBackAt)))))
      }
    convertError(response)
  }

  override def completeCharge(in: proto.CompleteChargeRequest)
      : Future[proto.CompleteChargingResponse] = {
    logger.info(
      "Requesting complete charging of {} from {}",
      in.droneId,
      in.chargingStationId)

    val entityRef = chargingStationEntityRefFactory(in.chargingStationId)
    val response = entityRef
      .askWithStatus(ChargingStation.CompleteCharging(in.droneId, _))
      .map(_ => proto.CompleteChargingResponse.defaultInstance)

    convertError(response)
  }
  // #charging

  private def convertError[T](response: Future[T]): Future[T] = {
    response.recoverWith {
      case _: TimeoutException =>
        Future.failed(
          new GrpcServiceException(
            Status.UNAVAILABLE.withDescription("Operation timed out")))
      case exc =>
        Future.failed(
          new GrpcServiceException(
            Status.INTERNAL.withDescription(exc.getMessage)))
    }
  }
}
Java
sourcepackage local.drones;

import static akka.actor.typed.javadsl.AskPattern.*;

import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.EntityRef;
import akka.grpc.GrpcServiceException;
import akka.pattern.StatusReply;
import charging.ChargingStation;
import com.google.protobuf.Timestamp;
import io.grpc.Status;
import java.time.Instant;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import local.drones.proto.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DroneServiceImpl implements DroneService {

  private static final Logger logger = LoggerFactory.getLogger(DroneServiceImpl.class);

  private final ActorRef<DeliveriesQueue.Command> deliveriesQueue;
  private final Function<String, EntityRef<ChargingStation.Command>>
      chargingStationEntityRefFactory;
  private final Settings settings;
  private final ActorSystem<?> system;

  private final ClusterSharding sharding;

  public DroneServiceImpl(
      ActorSystem<?> system,
      ActorRef<DeliveriesQueue.Command> deliveriesQueue,
      Function<String, EntityRef<ChargingStation.Command>> chargingStationEntityRefFactory,
      Settings settings) {
    this.system = system;
    this.deliveriesQueue = deliveriesQueue;
    this.chargingStationEntityRefFactory = chargingStationEntityRefFactory;
    this.settings = settings;
    this.sharding = ClusterSharding.get(system);
  }

  @Override
  public CompletionStage<ReportLocationResponse> reportLocation(ReportLocationRequest in) {
    var coordinates = in.getCoordinates();
    if (coordinates == null) {
      throw new GrpcServiceException(
          Status.INVALID_ARGUMENT.withDescription("coordinates are required but missing"));
    }
    logger.info(
        "Report location ({},{},{}) for drone {}",
        coordinates.getLatitude(),
        coordinates.getLongitude(),
        in.getAltitude(),
        in.getDroneId());
    var entityRef = sharding.entityRefFor(Drone.ENTITY_KEY, in.getDroneId());
    CompletionStage<Done> reply =
        entityRef.ask(
            replyTo ->
                new Drone.ReportPosition(
                    new Position(Coordinates.fromProto(coordinates), in.getAltitude()), replyTo),
            settings.askTimeout);
    var response = reply.thenApply(done -> ReportLocationResponse.getDefaultInstance());
    return convertError(response);
  }

  // #requestNextDelivery
  @Override
  public CompletionStage<RequestNextDeliveryResponse> requestNextDelivery(
      RequestNextDeliveryRequest in) {
    logger.info("Drone {} requesting next delivery", in.getDroneId());

    // get location for drone
    var entityRef = sharding.entityRefFor(Drone.ENTITY_KEY, in.getDroneId());
    var positionReply =
        entityRef.askWithStatus(
            (ActorRef<StatusReply<Position>> replyTo) -> new Drone.GetCurrentPosition(replyTo),
            settings.askTimeout);

    // ask for closest delivery
    CompletionStage<DeliveriesQueue.WaitingDelivery> chosenDeliveryReply =
        positionReply.thenCompose(
            position ->
                askWithStatus(
                    deliveriesQueue,
                    replyTo ->
                        new DeliveriesQueue.RequestDelivery(
                            in.getDroneId(), position.coordinates, replyTo),
                    settings.askTimeout,
                    system.scheduler()));

    var response =
        chosenDeliveryReply.thenApply(
            chosenDelivery ->
                RequestNextDeliveryResponse.newBuilder()
                    .setDeliveryId(chosenDelivery.deliveryId)
                    .setFrom(chosenDelivery.from.toProto())
                    .setTo(chosenDelivery.to.toProto())
                    .build());

    return convertError(response);
  }

  // #requestNextDelivery
  @Override
  public CompletionStage<CompleteDeliveryResponse> completeDelivery(CompleteDeliveryRequest in) {
    logger.info("Delivery {} completed", in.getDeliveryId());

    CompletionStage<Done> completeReply =
        askWithStatus(
            deliveriesQueue,
            replyTo -> new DeliveriesQueue.CompleteDelivery(in.getDeliveryId(), replyTo),
            settings.askTimeout,
            system.scheduler());

    var reply = completeReply.thenApply(done -> CompleteDeliveryResponse.getDefaultInstance());

    return convertError(reply);
  }

  // #charging
  @Override
  public CompletionStage<ChargingResponse> goCharge(GoChargeRequest in) {
    logger.info("Requesting charge of {} from {}", in.getDroneId(), in.getChargingStationId());
    var entityRef = chargingStationEntityRefFactory.apply(in.getChargingStationId());

    CompletionStage<ChargingStation.StartChargingResponse> chargingStationResponse =
        entityRef.askWithStatus(
            replyTo -> new ChargingStation.StartCharging(in.getDroneId(), replyTo),
            settings.askTimeout);

    var response =
        chargingStationResponse.thenApply(
            message -> {
              if (message instanceof ChargingStation.ChargingStarted) {
                var expectedComplete = ((ChargingStation.ChargingStarted) message).expectedComplete;
                return ChargingResponse.newBuilder()
                    .setStarted(
                        ChargingStarted.newBuilder()
                            .setExpectedComplete(instantToProtoTimestamp(expectedComplete))
                            .build())
                    .build();
              } else if (message instanceof ChargingStation.AllSlotsBusy) {
                var firstSlotFreeAt = ((ChargingStation.AllSlotsBusy) message).firstSlotFreeAt;
                return ChargingResponse.newBuilder()
                    .setComeBackLater(
                        ComeBackLater.newBuilder()
                            .setFirstSlotFreeAt(instantToProtoTimestamp(firstSlotFreeAt))
                            .build())
                    .build();
              } else {
                throw new IllegalArgumentException(
                    "Unexpected response type " + message.getClass());
              }
            });

    return convertError(response);
  }

  @Override
  public CompletionStage<CompleteChargingResponse> completeCharge(CompleteChargeRequest in) {
    logger.info(
        "Requesting complete charging of {} from {}", in.getDroneId(), in.getChargingStationId());
    var entityRef = chargingStationEntityRefFactory.apply(in.getChargingStationId());

    CompletionStage<Done> chargingStationResponse =
        entityRef.askWithStatus(
            replyTo -> new ChargingStation.CompleteCharging(in.getDroneId(), replyTo),
            settings.askTimeout);

    var response =
        chargingStationResponse.thenApply(done -> CompleteChargingResponse.getDefaultInstance());

    return convertError(response);
  }
  // #charging
  private static Timestamp instantToProtoTimestamp(Instant instant) {
    return Timestamp.newBuilder()
        .setSeconds(instant.getEpochSecond())
        .setNanos(instant.getNano())
        .build();
  }

  private <T> CompletionStage<T> convertError(CompletionStage<T> response) {
    return response.exceptionally(
        error -> {
          if (error instanceof TimeoutException) {
            throw new GrpcServiceException(
                Status.UNAVAILABLE.withDescription("Operation timed out"));
          } else {
            throw new GrpcServiceException(Status.INTERNAL.withDescription(error.getMessage()));
          }
        });
  }
}

Inspecting the queue

We add a new gRPC service for inspecting the current state of the queue:

Scala
sourcesyntax = "proto3";

option java_multiple_files = true;
option java_package = "local.drones.proto";

import "google/protobuf/empty.proto";
import "common/coordinates.proto";

package local.drones;

// gRPC definition for DroneService, for drones to interact with

service DeliveriesQueueService {
  rpc GetCurrentQueue (google.protobuf.Empty) returns (GetCurrentQueueResponse) {}
}

message GetCurrentQueueResponse {
  repeated WaitingDelivery waitingDeliveries = 1;
  repeated DeliveryInProgress deliveriesInProgress = 2;
}

message WaitingDelivery {
  string delivery_id = 1;
  common.Coordinates from = 2;
  common.Coordinates to = 3;
}

message DeliveryInProgress {
  string delivery_id = 1;
  string drone_id = 2;
}
Java
sourcesyntax = "proto3";

option java_multiple_files = true;
option java_package = "local.drones.proto";

import "common/coordinates.proto";

package local.drones;

// gRPC definition for DroneService, for drones to interact with

service DeliveriesQueueService {
  rpc GetCurrentQueue (GetCurrentQueueRequest) returns (GetCurrentQueueResponse) {}
}

message GetCurrentQueueRequest {
}

message GetCurrentQueueResponse {
  repeated WaitingDelivery waitingDeliveries = 1;
  repeated DeliveryInProgress deliveriesInProgress = 2;
}

message WaitingDelivery {
  string delivery_id = 1;
  common.Coordinates from = 2;
  common.Coordinates to = 3;
}

message DeliveryInProgress {
  string delivery_id = 1;
  string drone_id = 2;
}

Implementation of the generated service interface:

Scala
sourcepackage local.drones

import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.{ ActorRef, ActorSystem }
import akka.util.Timeout
import com.google.protobuf.empty.Empty
import local.drones.proto.DeliveriesQueueService

import scala.concurrent.Future

class DeliveriesQueueServiceImpl(
    settings: Settings,
    deliveriesQueue: ActorRef[DeliveriesQueue.Command])(
    implicit system: ActorSystem[_])
    extends DeliveriesQueueService {

  import system.executionContext
  private implicit val timeout: Timeout = settings.askTimeout

  override def getCurrentQueue(
      in: Empty): Future[proto.GetCurrentQueueResponse] = {
    val reply = deliveriesQueue.ask(DeliveriesQueue.GetCurrentState(_))

    reply.map { state =>
      proto.GetCurrentQueueResponse(
        waitingDeliveries = state.waitingDeliveries.map(waiting =>
          proto.WaitingDelivery(
            deliveryId = waiting.deliveryId,
            from = Some(waiting.from.toProto),
            to = Some(waiting.to.toProto))),
        deliveriesInProgress = state.deliveriesInProgress.map(inProgress =>
          proto.DeliveryInProgress(
            deliveryId = inProgress.deliveryId,
            droneId = inProgress.droneId)))
    }

  }
}
Java
sourcepackage local.drones;

import static akka.actor.typed.javadsl.AskPattern.*;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import local.drones.proto.*;

public class DeliveriesQueueServiceImpl implements DeliveriesQueueService {

  private final ActorSystem<?> system;
  private final Settings settings;
  private final ActorRef<DeliveriesQueue.Command> deliveriesQueue;

  public DeliveriesQueueServiceImpl(
      ActorSystem<?> system, Settings settings, ActorRef<DeliveriesQueue.Command> deliveriesQueue) {
    this.system = system;
    this.settings = settings;
    this.deliveriesQueue = deliveriesQueue;
  }

  @Override
  public CompletionStage<GetCurrentQueueResponse> getCurrentQueue(GetCurrentQueueRequest in) {
    var reply =
        ask(
            deliveriesQueue,
            DeliveriesQueue.GetCurrentState::new,
            settings.askTimeout,
            system.scheduler());

    return reply.thenApply(this::toProto);
  }

  private GetCurrentQueueResponse toProto(DeliveriesQueue.State state) {
    return GetCurrentQueueResponse.newBuilder()
        .addAllWaitingDeliveries(
            state.waitingDeliveries.stream().map(this::waitingToProto).collect(Collectors.toList()))
        .addAllDeliveriesInProgress(
            state.deliveriesInProgress.stream()
                .map(this::deliveryInProgressToProto)
                .collect(Collectors.toList()))
        .build();
  }

  private WaitingDelivery waitingToProto(DeliveriesQueue.WaitingDelivery waiting) {
    return WaitingDelivery.newBuilder()
        .setDeliveryId(waiting.deliveryId)
        .setFrom(waiting.from.toProto())
        .setTo(waiting.to.toProto())
        .build();
  }

  private DeliveryInProgress deliveryInProgressToProto(
      DeliveriesQueue.DeliveryInProgress inProgress) {
    return DeliveryInProgress.newBuilder()
        .setDeliveryId(inProgress.deliveryId)
        .setDroneId(inProgress.droneId)
        .build();
  }
}

Finally, we need to start the gRPC server with the two services:

Scala
sourceval service = ServiceHandler.concatOrNotFound(
  DroneOverviewServiceHandler.partial(droneOverviewService),
  RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService),
  ChargingStationServiceHandler.partial(chargingStationService),
  eventPullHandler,
  eventPushHandler,
  ServerReflection.partial(
    List(
      DroneOverviewService,
      RestaurantDeliveriesService,
      ChargingStationService)))

val bound = Http(system).newServerAt(interface, port).bind(service)
Java
source@SuppressWarnings("unchecked")
var service =
    ServiceHandler.concatOrNotFound(
        DroneOverviewServiceHandlerFactory.create(droneOverviewService, system),
        RestaurantDeliveriesServiceHandlerFactory.create(restaurantDeliveriesService, system),
        ChargingStationServiceHandlerFactory.create(chargingStationService, system),
        eventPullHandler,
        eventPushHandler,
        ServerReflection.create(
            List.of(
                DroneOverviewService.description,
                RestaurantDeliveriesService.description,
                ChargingStationService.description),
            system));

var bound = Http.get(system).newServerAt(host, port).bind(service);

Running the sample

The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide:

As this service consumes events from the service built in the previous step, start the local-drone-control service first:

To start the local-drone-control-service:

sbt run
mvn compile exec:exec

Then start the drone-restaurant-deliveries-service.

As the service needs a PostgreSQL instance running, start that up in a docker container and create the database schema if you did not do that in a previous step of the guide:

docker compose up --wait
docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql

Then start the service:

sbt -Dconfig.resource=local1.conf run

And optionally one or two more Akka cluster nodes, but note that the local drone controls are statically configured to the gRPC port of the first and will only publish events to that node.

sbt -Dconfig.resource=local2.conf run
sbt -Dconfig.resource=local3.conf run
mvn compile exec:exec -DAPP_CONFIG=local1.conf

And optionally one or two more Akka cluster nodes, but note that the local drone controls are statically configured to the gRPC port of the first and will only publish events to that node.

mvn compile exec:exec -DAPP_CONFIG=local2.conf
mvn compile exec:exec -DAPP_CONFIG=local3.conf

Create a restaurant with grpcurl:

grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant

Set up another restaurant, closest to a different local drone control

grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant

Register a delivery for the first restaurant

grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery

Register a delivery for the second restaurant

grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery

Now update one or more drones a few times with grpcurl against the local-drone-control:

grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
 
grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.08125, "latitude": 59.41834}, "altitude": 10}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation

grpcurl -d '{"drone_id":"drone2", "coordinates": {"longitude": 18.08114, "latitude": 59.42122}, "altitude": 8 }' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation

Request a delivery for drone1

grpcurl -d '{"drone_id":"drone1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.RequestNextDelivery

Mark the delivery as completed

grpcurl -d '{"delivery_id":"order1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.CompleteDelivery

Inspect the current state of the local delivery queue

grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue

What’s next?

  • Packaging up the two services for deployment
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.