Local Drone Control Service
As the other features of Akka Edge are build on top of Event Sourcing, let us start by implementing a digital twin for drones using the Akka Event Sourced Behavior API.
We will represent drone as an Event Sourced entity, if you are unfamiliar with Event Sourcing, refer to the Event Sourcing section in the Akka guide for an explanation. The Event Sourcing with Akka video is also a good starting point for learning Event Sourcing.
For drones to communicate their location to the digital twin we will create a gRPC API.
When this first step is completed, the drones will be able to report their location and users inspect the current location of a drone connected to the local control center PoP.
Implementing a Drone digital twin
Commands and events
Commands are the public API of an entity that other parts of the system use to interact with it. Entity state can only be changed by commands. The results of commands are emitted as events. A command can request state changes, and different events might be generated depending on the current state of the entity. A command can also be rejected if it has invalid input or can’t be handled by the current state of the entity.
The Drone only accepts two commands: ReportLocation
and GetLocation
. When the reported location changes it always persists a PositionUpdated
event, but additionally, whenever the position means it changed place on a more coarse grained grid, it also emits a CoarseGrainedLocationChanged
event. We will revisit the reason for the coarse grained event in the next step in this guide.
The definition of the commands and events look like this:
- Scala
- Java
-
source
/** This interface defines all the commands (messages) that the Drone actor supports. */ interface Command extends CborSerializable {} /** * A command to report the current position (coordinates and altitude) of the drone. * * <p>It replies with `Done`, which is sent back to the caller when all the events emitted by this * command are successfully persisted. */ public static final class ReportPosition implements Command { public final Position position; public final ActorRef<Done> replyTo; public ReportPosition(Position position, ActorRef<Done> replyTo) { this.position = position; this.replyTo = replyTo; } } /** * A command to query the current position (coordinates and altitude) of the drone. * * <p>It replies with a `StatusReply<Position>`, which is sent back to the caller as a success * if the coordinates are known. If not an error is sent back. */ public static final class GetCurrentPosition implements Command { public final ActorRef<StatusReply<Position>> replyTo; @JsonCreator public GetCurrentPosition(ActorRef<StatusReply<Position>> replyTo) { this.replyTo = replyTo; } } /** This interface defines all the events that the Drone supports. */ interface Event extends CborSerializable {} public static final class PositionUpdated implements Event { public final Position position; @JsonCreator public PositionUpdated(Position position) { this.position = position; } } public static final class CoarseGrainedLocationChanged implements Event { public final CoarseGrainedCoordinates coordinates; @JsonCreator public CoarseGrainedLocationChanged(CoarseGrainedCoordinates coordinates) { this.coordinates = coordinates; } }
State
When the location is reported it is kept as a currentState
, additionally the 100 previous reported locations are kept in a list.
The list of historical locations is not currently used for anything but is here to show that an entity could keep a time window of fine-grained information to make local decisions at a detail level that would be impractical and maybe not even interesting to report to a central cloud service.
- Scala
- Java
-
source
class State implements CborSerializable { Optional<Position> currentPosition; final List<Position> historicalPositions; State() { currentPosition = Optional.empty(); historicalPositions = new ArrayList<>(); } Optional<CoarseGrainedCoordinates> coarseGrainedCoordinates() { return currentPosition.map(p -> CoarseGrainedCoordinates.fromCoordinates(p.coordinates)); } }
Command handler
The Drone entity will receive commands that report when the Drone changes location. We will implement a command handler to process these commands and emit a reply.
The command handler for the Drone looks like this:
- Scala
- Java
-
source
@Override public CommandHandler<Command, Event, State> commandHandler() { return newCommandHandlerBuilder() .forAnyState() .onCommand(ReportPosition.class, this::onReportPosition) .onCommand(GetCurrentPosition.class, this::onGetCurrentPosition) .build(); } private Effect<Event, State> onReportPosition(State state, ReportPosition command) { if (state.currentPosition.equals(Optional.of(command.position))) { // already seen return Effect().reply(command.replyTo, done()); } else { var newCoarseGrainedLocation = CoarseGrainedCoordinates.fromCoordinates(command.position.coordinates); if (state.coarseGrainedCoordinates().equals(Optional.of(newCoarseGrainedLocation))) { // same grid location as before return Effect() .persist(new PositionUpdated(command.position)) .thenReply(command.replyTo, newState -> done()); } else { // no previous location known or new grid location return Effect() .persist( Arrays.asList( new PositionUpdated(command.position), new CoarseGrainedLocationChanged(newCoarseGrainedLocation))) .thenReply(command.replyTo, newState -> done()); } } } private Effect<Event, State> onGetCurrentPosition(State state, GetCurrentPosition command) { return state .currentPosition .map(position -> Effect().reply(command.replyTo, StatusReply.success(position))) .orElse(Effect().reply(command.replyTo, StatusReply.error("Position of drone is unknown"))); }
Note how the handler de-duplicates reports of the same location, immediately replying with an acknowledgement without persisting any change.
In addition to storing the PositionUpdated
event, the command handler also calculates a coarse grained location, and persists a CoarseGrainedLocationChanged
event as well, if it did change from the previous coarse grained location.
Event handler
From commands, the entity creates events that represent state changes. Aligning with the command handler above, the entity’s event handler reacts to events and updates the state. The events are continuously persisted to the Event Journal datastore, while the entity state is kept in memory. Other parts of the application may listen to the events. In case of a restart, the entity recovers its latest state by replaying the events from the Event Journal.
The event handler only reacts to the PositionUpdated
event and ignores the CoarseGrainedLocationChanged
as the coarse grained location can be calculated from the more fine-grained position coordinates:
- Scala
- Java
-
source
@Override public EventHandler<State, Event> eventHandler() { return newEventHandlerBuilder() .forAnyState() .onEvent( PositionUpdated.class, (state, event) -> { if (state.currentPosition.isPresent()) { state.historicalPositions.add(state.currentPosition.get()); if (state.historicalPositions.size() > LOCATION_HISTORY_LIMIT) { state.historicalPositions.remove(0); } } state.currentPosition = Optional.of(event.position); return state; }) .onEvent( CoarseGrainedLocationChanged.class, (state, event) -> // can be derived from position, so not really updating state, // persisted as events for aggregation state) .build(); }
Serialization
The state and events of the entity must be serializable because they are written to the datastore, if the local drone control needs to scale out across several nodes to handle traffic, the commands would also be sent between nodes within the Akka cluster.
The state, commands and events are marked with akka.serialization.jackson.CborSerializable
to use CBOR serialization from Akka Serialization Jackson module for serialization.
Journal storage
In this sample we use Akka Persistence R2DBC with the H2 in-process database, with a file backed storage. H2 requires no additional external database service so can be convenient for both development and production usage where only a single node interacts with the journal and overhead needs to be kept low.
It is of course also possible to instead use a separate standalone database such as for example PostgreSQL.
Config to use H2 looks like this:
- Scala
- Java
-
source
akka { persistence { journal { plugin = "akka.persistence.r2dbc.journal" } snapshot-store { plugin = "akka.persistence.r2dbc.snapshot" } state { plugin = "akka.persistence.r2dbc.state" } r2dbc { # Single projection instance, no need for many topics journal.publish-events-number-of-topics = 1 connection-factory = ${akka.persistence.r2dbc.h2} connection-factory { additional-init = ${akka.projection.r2dbc.default-h2-schema} protocol = "file" # database name is full path to database # default for running from SBT but keeping state across runs database = "./target/drone-db" # inject a path to a persistent volume when running in a container database = ${?H2_DATABASE_PATH} } } } } // #startFromSnapshot akka.persistence.r2dbc.query.start-from-snapshot.enabled = true // #startFromSnapshot
In addition to the configuration, the following additional dependencies are needed in the project build:
- sbt
libraryDependencies ++= Seq( "com.h2database" % "h2" % "2.2.224", "io.r2dbc" % "r2dbc-h2" % "1.0.0.RELEASE" )
- Maven
- Gradle
gRPC Service API for the drone communication
To allow drones to actually use the service we need a public API reachable over the network. For this we will use Akka gRPC giving us a type safe, efficient protocol that allows clients to be written in many languages.
The service descriptor for the API is defined in protobuf, it implements the report command that entity accepts but not one matching the get location command:
- Scala
- Java
-
source
syntax = "proto3"; option java_multiple_files = true; option java_package = "local.drones.proto"; import "common/coordinates.proto"; import "google/protobuf/timestamp.proto"; package local.drones; // gRPC definition for DroneService, for drones to interact with service DroneService { rpc ReportLocation (ReportLocationRequest) returns (ReportLocationResponse) {} // deliveries rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {} rpc CompleteDelivery (CompleteDeliveryRequest) returns (CompleteDeliveryResponse) {} // charging rpc GoCharge (GoChargeRequest) returns (ChargingResponse) {} rpc CompleteCharge (CompleteChargeRequest) returns (CompleteChargingResponse) {} } message ReportLocationRequest { string drone_id = 1; common.Coordinates coordinates = 2; // altitude in meters double altitude = 4; } message ReportLocationResponse { } message RequestNextDeliveryRequest { string drone_id = 1; } message RequestNextDeliveryResponse { string delivery_id = 1; common.Coordinates from = 2; common.Coordinates to = 3; } message CompleteDeliveryRequest { string delivery_id = 1; } message CompleteDeliveryResponse { } message GoChargeRequest { string drone_id = 1; string charging_station_id = 2; } message ChargingResponse { oneof response { ChargingStarted started = 1; ComeBackLater come_back_later = 2; }; } message ChargingStarted { google.protobuf.Timestamp expected_complete = 1; } message ComeBackLater { google.protobuf.Timestamp first_slot_free_at = 1; } message CompleteChargeRequest { string charging_station_id = 1; string drone_id = 2; } message CompleteChargingResponse { }
When compiling the project the Akka gRPC maven plugin generates a service interface for us to implement. Our implementation of it interacts with the entity:
- Scala
- Java
-
source
package local.drones; import static akka.actor.typed.javadsl.AskPattern.*; import akka.Done; import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.cluster.sharding.typed.javadsl.ClusterSharding; import akka.cluster.sharding.typed.javadsl.EntityRef; import akka.grpc.GrpcServiceException; import akka.pattern.StatusReply; import charging.ChargingStation; import com.google.protobuf.Timestamp; import io.grpc.Status; import java.time.Instant; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeoutException; import java.util.function.Function; import local.drones.proto.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DroneServiceImpl implements DroneService { private static final Logger logger = LoggerFactory.getLogger(DroneServiceImpl.class); private final ActorRef<DeliveriesQueue.Command> deliveriesQueue; private final Function<String, EntityRef<ChargingStation.Command>> chargingStationEntityRefFactory; private final Settings settings; private final ActorSystem<?> system; private final ClusterSharding sharding; public DroneServiceImpl( ActorSystem<?> system, ActorRef<DeliveriesQueue.Command> deliveriesQueue, Function<String, EntityRef<ChargingStation.Command>> chargingStationEntityRefFactory, Settings settings) { this.system = system; this.deliveriesQueue = deliveriesQueue; this.chargingStationEntityRefFactory = chargingStationEntityRefFactory; this.settings = settings; this.sharding = ClusterSharding.get(system); } @Override public CompletionStage<ReportLocationResponse> reportLocation(ReportLocationRequest in) { var coordinates = in.getCoordinates(); if (coordinates == null) { throw new GrpcServiceException( Status.INVALID_ARGUMENT.withDescription("coordinates are required but missing")); } logger.info( "Report location ({},{},{}) for drone {}", coordinates.getLatitude(), coordinates.getLongitude(), in.getAltitude(), in.getDroneId()); var entityRef = sharding.entityRefFor(Drone.ENTITY_KEY, in.getDroneId()); CompletionStage<Done> reply = entityRef.ask( replyTo -> new Drone.ReportPosition( new Position(Coordinates.fromProto(coordinates), in.getAltitude()), replyTo), settings.askTimeout); var response = reply.thenApply(done -> ReportLocationResponse.getDefaultInstance()); return convertError(response); } // #requestNextDelivery @Override public CompletionStage<RequestNextDeliveryResponse> requestNextDelivery( RequestNextDeliveryRequest in) { logger.info("Drone {} requesting next delivery", in.getDroneId()); // get location for drone var entityRef = sharding.entityRefFor(Drone.ENTITY_KEY, in.getDroneId()); var positionReply = entityRef.askWithStatus( (ActorRef<StatusReply<Position>> replyTo) -> new Drone.GetCurrentPosition(replyTo), settings.askTimeout); // ask for closest delivery CompletionStage<DeliveriesQueue.WaitingDelivery> chosenDeliveryReply = positionReply.thenCompose( position -> askWithStatus( deliveriesQueue, replyTo -> new DeliveriesQueue.RequestDelivery( in.getDroneId(), position.coordinates, replyTo), settings.askTimeout, system.scheduler())); var response = chosenDeliveryReply.thenApply( chosenDelivery -> RequestNextDeliveryResponse.newBuilder() .setDeliveryId(chosenDelivery.deliveryId) .setFrom(chosenDelivery.from.toProto()) .setTo(chosenDelivery.to.toProto()) .build()); return convertError(response); } // #requestNextDelivery @Override public CompletionStage<CompleteDeliveryResponse> completeDelivery(CompleteDeliveryRequest in) { logger.info("Delivery {} completed", in.getDeliveryId()); CompletionStage<Done> completeReply = askWithStatus( deliveriesQueue, replyTo -> new DeliveriesQueue.CompleteDelivery(in.getDeliveryId(), replyTo), settings.askTimeout, system.scheduler()); var reply = completeReply.thenApply(done -> CompleteDeliveryResponse.getDefaultInstance()); return convertError(reply); } // #charging @Override public CompletionStage<ChargingResponse> goCharge(GoChargeRequest in) { logger.info("Requesting charge of {} from {}", in.getDroneId(), in.getChargingStationId()); var entityRef = chargingStationEntityRefFactory.apply(in.getChargingStationId()); CompletionStage<ChargingStation.StartChargingResponse> chargingStationResponse = entityRef.askWithStatus( replyTo -> new ChargingStation.StartCharging(in.getDroneId(), replyTo), settings.askTimeout); var response = chargingStationResponse.thenApply( message -> { if (message instanceof ChargingStation.ChargingStarted) { var expectedComplete = ((ChargingStation.ChargingStarted) message).expectedComplete; return ChargingResponse.newBuilder() .setStarted( ChargingStarted.newBuilder() .setExpectedComplete(instantToProtoTimestamp(expectedComplete)) .build()) .build(); } else if (message instanceof ChargingStation.AllSlotsBusy) { var firstSlotFreeAt = ((ChargingStation.AllSlotsBusy) message).firstSlotFreeAt; return ChargingResponse.newBuilder() .setComeBackLater( ComeBackLater.newBuilder() .setFirstSlotFreeAt(instantToProtoTimestamp(firstSlotFreeAt)) .build()) .build(); } else { throw new IllegalArgumentException( "Unexpected response type " + message.getClass()); } }); return convertError(response); } @Override public CompletionStage<CompleteChargingResponse> completeCharge(CompleteChargeRequest in) { logger.info( "Requesting complete charging of {} from {}", in.getDroneId(), in.getChargingStationId()); var entityRef = chargingStationEntityRefFactory.apply(in.getChargingStationId()); CompletionStage<Done> chargingStationResponse = entityRef.askWithStatus( replyTo -> new ChargingStation.CompleteCharging(in.getDroneId(), replyTo), settings.askTimeout); var response = chargingStationResponse.thenApply(done -> CompleteChargingResponse.getDefaultInstance()); return convertError(response); } // #charging private static Timestamp instantToProtoTimestamp(Instant instant) { return Timestamp.newBuilder() .setSeconds(instant.getEpochSecond()) .setNanos(instant.getNano()) .build(); } private <T> CompletionStage<T> convertError(CompletionStage<T> response) { return response.exceptionally( error -> { if (error instanceof TimeoutException) { throw new GrpcServiceException( Status.UNAVAILABLE.withDescription("Operation timed out")); } else { throw new GrpcServiceException(Status.INTERNAL.withDescription(error.getMessage())); } }); } }
Finally, we need to start the gRPC server, making service implementation available for calls from drones:
- Scala
- Java
-
source
public static void start( String host, int port, ActorSystem<?> system, DroneService droneService, DeliveriesQueueService deliveriesQueueService) { @SuppressWarnings("unchecked") var service = ServiceHandler.concatOrNotFound( DroneServiceHandlerFactory.create(droneService, system), DeliveriesQueueServiceHandlerFactory.create(deliveriesQueueService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( Arrays.asList(DroneService.description, DeliveriesQueueService.description), system)); var bound = Http.get(system).newServerAt(host, port).bind(service); bound.whenComplete( (binding, error) -> { if (error == null) { binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); var address = binding.localAddress(); system .log() .info( "Drone control gRPC server started {}:{}", address.getHostString(), address.getPort()); } else { system.log().error("Failed to bind gRPC endpoint, terminating system", error); system.terminate(); } }); }
The Akka HTTP server must be running with HTTP/2 to serve gRPC, this is done through config:
Wiring it all up
The main of this service starts up an actor system with a root behavior which is responsible for bootstrapping all parts of the application.
Note that the bootstrap contains some parts not yet described, the DroneEvents
and the DeliveriesQueue
. They will be covered in the following sections of this guide and can be ignored for now.
- Scala
- Java
-
source
public static void main(String[] args) { ActorSystem.create(rootBehavior(), "local-drone-control"); } private static Behavior<Void> rootBehavior() { return Behaviors.setup( (ActorContext<Void> context) -> { var settings = Settings.create(context.getSystem()); context.getLog().info("Local Drone Control [{}] starting up", settings.locationId); // A single node, but still a cluster, to be able to run sharding for the entities var cluster = Cluster.get(context.getSystem()); cluster.manager().tell(new Join(cluster.selfMember().address())); // keep track of local drones, project aggregate info to the cloud Drone.init(context.getSystem()); context.spawn( DroneEvents.eventToCloudPushBehavior(context.getSystem(), settings), "DroneEventsProjection"); // consume delivery events from the cloud service var deliveriesQueue = context.spawn(DeliveriesQueue.create(), "DeliveriesQueue"); context.spawn( DeliveryEvents.projectionBehavior(context.getSystem(), deliveriesQueue, settings), "DeliveriesProjection"); var deliveriesQueueService = new DeliveriesQueueServiceImpl(context.getSystem(), settings, deliveriesQueue); // replicated charging station entity var chargingStationReplication = ChargingStation.initEdge(context.getSystem(), settings.locationId); var droneService = new DroneServiceImpl( context.getSystem(), deliveriesQueue, chargingStationReplication.entityRefFactory(), settings); var grpcInterface = context .getSystem() .settings() .config() .getString("local-drone-control.grpc.interface"); var grpcPort = context.getSystem().settings().config().getInt("local-drone-control.grpc.port"); LocalDroneControlServer.start( grpcInterface, grpcPort, context.getSystem(), droneService, deliveriesQueueService); return Behaviors.empty(); }); }
Running the sample
The complete sample can be downloaded from GitHub, but note that it also includes the next steps of the guide:
- Scala drone-scala.zip
- Java drone-java.zip
mvn compile exec:exec
Try it with grpcurl:
grpgrpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation
What’s next?
- Consuming the published coarse grained drone locations in a cloud service