Part 3: Working with Device Actors

Dependency

Add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-actor_2.12:2.5.32"
}

Introduction

In the previous topics we explained how to view actor systems in the large, that is, how components should be represented, how actors should be arranged in the hierarchy. In this part, we will look at actors in the small by implementing the device actor.

If we were working with objects, we would typically design the API as interfaces, a collection of abstract methods to be filled out by the actual implementation. In the world of actors, protocols take the place of interfaces. While it is not possible to formalize general protocols in the programming language, we can compose their most basic element, messages. So, we will start by identifying the messages we will want to send to device actors.

Typically, messages fall into categories, or patterns. By identifying these patterns, you will find that it becomes easier to choose between them and to implement them. The first example demonstrates the request-respond message pattern.

Identifying messages for devices

The tasks of a device actor will be simple:

  • Collect temperature measurements
  • When asked, report the last measured temperature

However, a device might start without immediately having a temperature measurement. Hence, we need to account for the case where a temperature is not present. This also allows us to test the query part of the actor without the write part present, as the device actor can report an empty result.

The protocol for obtaining the current temperature from the device actor is simple. The actor:

  1. Waits for a request for the current temperature.
  2. Responds to the request with a reply that either:
    • contains the current temperature or,
    • indicates that a temperature is not yet available.

We need two messages, one for the request, and one for the reply. Our first attempt might look like the following:

Scala
sourcefinal case object ReadTemperature
final case class RespondTemperature(value: Option[Double])
Java
sourcepublic static final class ReadTemperature {}

public static final class RespondTemperature {
  final Optional<Double> value;

  public RespondTemperature(Optional<Double> value) {
    this.value = value;
  }
}

These two messages seem to cover the required functionality. However, the approach we choose must take into account the distributed nature of the application. While the basic mechanism is the same for communicating with an actor on the local JVM as with a remote actor, we need to keep the following in mind:

  • There will be observable differences in the latency of delivery between local and remote messages, because factors like network link bandwidth and the message size also come into play.
  • Reliability is a concern because a remote message send involves more steps, which means that more can go wrong.
  • A local send will pass a reference to the message inside the same JVM, without any restrictions on the underlying object which is sent, whereas a remote transport will place a limit on the message size.

In addition, while sending inside the same JVM is significantly more reliable, if an actor fails due to a programmer error while processing the message, the effect is the same as if a remote network request fails due to the remote host crashing while processing the message. Even though in both cases, the service recovers after a while (the actor is restarted by its supervisor, the host is restarted by an operator or by a monitoring system) individual requests are lost during the crash. Therefore, writing your actors such that every message could possibly be lost is the safe, pessimistic bet.

But to further understand the need for flexibility in the protocol, it will help to consider Akka message ordering and message delivery guarantees. Akka provides the following behavior for message sends:

  • At-most-once delivery, that is, no guaranteed delivery.
  • Message ordering is maintained per sender, receiver pair.

The following sections discuss this behavior in more detail:

Message delivery

The delivery semantics provided by messaging subsystems typically fall into the following categories:

  • At-most-once delivery — each message is delivered zero or one time; in more causal terms it means that messages can be lost, but are never duplicated.
  • At-least-once delivery — potentially multiple attempts are made to deliver each message, until at least one succeeds; again, in more causal terms this means that messages can be duplicated but are never lost.
  • Exactly-once delivery — each message is delivered exactly once to the recipient; the message can neither be lost nor be duplicated.

The first behavior, the one used by Akka, is the cheapest and results in the highest performance. It has the least implementation overhead because it can be done in a fire-and-forget fashion without keeping the state at the sending end or in the transport mechanism. The second, at-least-once, requires retries to counter transport losses. This adds the overhead of keeping the state at the sending end and having an acknowledgment mechanism at the receiving end. Exactly-once delivery is most expensive, and results in the worst performance: in addition to the overhead added by at-least-once delivery, it requires the state to be kept at the receiving end in order to filter out duplicate deliveries.

In an actor system, we need to determine exact meaning of a guarantee — at which point does the system consider the delivery as accomplished:

  1. When the message is sent out on the network?
  2. When the message is received by the target actor’s host?
  3. When the message is put into the target actor’s mailbox?
  4. When the message target actor starts to process the message?
  5. When the target actor has successfully processed the message?

Most frameworks and protocols that claim guaranteed delivery actually provide something similar to points 4 and 5. While this sounds reasonable, is it actually useful? To understand the implications, consider a simple, practical example: a user attempts to place an order and we only want to claim that it has successfully processed once it is actually on disk in the orders database.

If we rely on the successful processing of the message, the actor will report success as soon as the order has been submitted to the internal API that has the responsibility to validate it, process it and put it into the database. Unfortunately, immediately after the API has been invoked any of the following can happen:

  • The host can crash.
  • Deserialization can fail.
  • Validation can fail.
  • The database might be unavailable.
  • A programming error might occur.

This illustrates that the guarantee of delivery does not translate to the domain level guarantee. We only want to report success once the order has been actually fully processed and persisted. The only entity that can report success is the application itself, since only it has any understanding of the domain guarantees required. No generalized framework can figure out the specifics of a particular domain and what is considered a success in that domain.

In this particular example, we only want to signal success after a successful database write, where the database acknowledged that the order is now safely stored. For these reasons Akka lifts the responsibilities of guarantees to the application itself, i.e. you have to implement them yourself with the tools that Akka provides. This gives you full control of the guarantees that you want to provide. Now, let’s consider the message ordering that Akka provides to make it easy to reason about application logic.

Message Ordering

In Akka, for a given pair of actors, messages sent directly from the first to the second will not be received out-of-order. The word directly emphasizes that this guarantee only applies when sending with the tell operator directly to the final destination, but not when employing mediators.

If:

  • Actor A1 sends messages M1, M2, M3 to A2.
  • Actor A3 sends messages M4, M5, M6 to A2.

This means that, for Akka messages:

  • If M1 is delivered it must be delivered before M2 and M3.
  • If M2 is delivered it must be delivered before M3.
  • If M4 is delivered it must be delivered before M5 and M6.
  • If M5 is delivered it must be delivered before M6.
  • A2 can see messages from A1 interleaved with messages from A3.
  • Since there is no guaranteed delivery, any of the messages may be dropped, i.e. not arrive at A2.

These guarantees strike a good balance: having messages from one actor arrive in-order is convenient for building systems that can be easily reasoned about, while on the other hand allowing messages from different actors to arrive interleaved provides sufficient freedom for an efficient implementation of the actor system.

For the full details on delivery guarantees please refer to the reference page.

Adding flexibility to device messages

Our first query protocol was correct, but did not take into account distributed application execution. If we want to implement resends in the actor that queries a device actor (because of timed out requests), or if we want to query multiple actors, we need to be able to correlate requests and responses. Hence, we add one more field to our messages, so that an ID can be provided by the requester (we will add this code to our app in a later step):

Scala
sourcefinal case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
Java
sourcepublic static final class ReadTemperature {
  final long requestId;

  public ReadTemperature(long requestId) {
    this.requestId = requestId;
  }
}

public static final class RespondTemperature {
  final long requestId;
  final Optional<Double> value;

  public RespondTemperature(long requestId, Optional<Double> value) {
    this.requestId = requestId;
    this.value = value;
  }
}

Defining the device actor and its read protocol

As we learned in the Hello World example, each actor defines the type of messages it will accept. Our device actor has the responsibility to use the same ID parameter for the response of a given query, which would make it look like the following.

Scala
sourceimport akka.actor.{ Actor, ActorLogging, Props }

object Device {
  def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))

  final case class ReadTemperature(requestId: Long)
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
  import Device._

  var lastTemperatureReading: Option[Double] = None

  override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
  override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)

  override def receive: Receive = {
    case ReadTemperature(id) =>
      sender() ! RespondTemperature(id, lastTemperatureReading)
  }

}
Java
source
import java.util.Optional; import akka.actor.AbstractActor; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, () -> new Device(groupId, deviceId)); } public static final class ReadTemperature { final long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match( ReadTemperature.class, r -> { getSender() .tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }

Note in the code that:

  • The companion objectstatic method defines how to construct a Device actor. The props parameters include an ID for the device and the group to which it belongs, which we will use later.
  • The companion objectclass includes the definitions of the messages we reasoned about previously.
  • In the Device class, the value of lastTemperatureReading is initially set to NoneOptional.empty(), and the actor will report it back if queried.

Testing the actor

Based on the simple actor above, we could write a simple test. You can check a full example of an Actor test in the Quickstart guide here Quickstart Guide Testing example Quickstart Guide Testing example. You’ll find there an example on how you can fully setup an Actor test, so that you can run it properly.

In the test tree of your project, add the following code to a DeviceSpec.scalaDeviceTest.java file. (We use ScalaTest but any other testing framework can be used with the Akka Testkit).

You can run this test by running mvn test or by running test at the sbt prompt.

Scala
source"reply with empty reading if no temperature is known" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
  val response = probe.expectMsgType[Device.RespondTemperature]
  response.requestId should ===(42L)
  response.value should ===(None)
}
Java
source@Test
public void testReplyWithEmptyReadingIfNoTemperatureIsKnown() {
  TestKit probe = new TestKit(system);
  ActorRef deviceActor = system.actorOf(Device.props("group", "device"));
  deviceActor.tell(new Device.ReadTemperature(42L), probe.getRef());
  Device.RespondTemperature response = probe.expectMsgClass(Device.RespondTemperature.class);
  assertEquals(42L, response.requestId);
  assertEquals(Optional.empty(), response.value);
}

Now, the actor needs a way to change the state of the temperature when it receives a message from the sensor.

Adding a write protocol

The purpose of the write protocol is to update the currentTemperature field when the actor receives a message that contains the temperature. Again, it is tempting to define the write protocol as a very simple message, something like this:

Scala
sourcefinal case class RecordTemperature(value: Double)
Java
sourcepublic static final class RecordTemperature {
  final double value;

  public RecordTemperature(double value) {
    this.value = value;
  }
}

However, this approach does not take into account that the sender of the record temperature message can never be sure if the message was processed or not. We have seen that Akka does not guarantee delivery of these messages and leaves it to the application to provide success notifications. In our case, we would like to send an acknowledgment to the sender once we have updated our last temperature recording, e.g. final case class TemperatureRecorded(requestId: Long)TemperatureRecorded. Just like in the case of temperature queries and responses, it is a good idea to include an ID field to provide maximum flexibility.

Actor with read and write messages

Putting the read and write protocol together, the device actor looks like the following example:

Scala
sourceimport akka.actor.{ Actor, ActorLogging, Props }

object Device {
  def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))

  final case class RecordTemperature(requestId: Long, value: Double)
  final case class TemperatureRecorded(requestId: Long)

  final case class ReadTemperature(requestId: Long)
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
  import Device._
  var lastTemperatureReading: Option[Double] = None

  override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
  override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)

  override def receive: Receive = {
    case RecordTemperature(id, value) =>
      log.info("Recorded temperature reading {} with {}", value, id)
      lastTemperatureReading = Some(value)
      sender() ! TemperatureRecorded(id)

    case ReadTemperature(id) =>
      sender() ! RespondTemperature(id, lastTemperatureReading)
  }
}
Java
source
import java.util.Optional; import akka.actor.AbstractActor; import akka.actor.AbstractActor.Receive; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; public class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, () -> new Device(groupId, deviceId)); } public static final class RecordTemperature { final long requestId; final double value; public RecordTemperature(long requestId, double value) { this.requestId = requestId; this.value = value; } } public static final class TemperatureRecorded { final long requestId; public TemperatureRecorded(long requestId) { this.requestId = requestId; } } public static final class ReadTemperature { final long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match( RecordTemperature.class, r -> { log.info("Recorded temperature reading {} with {}", r.value, r.requestId); lastTemperatureReading = Optional.of(r.value); getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); }) .match( ReadTemperature.class, r -> { getSender() .tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }

We should also write a new test case now, exercising both the read/query and write/record functionality together:

Scala
source"reply with latest temperature reading" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 1))

  deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
  val response1 = probe.expectMsgType[Device.RespondTemperature]
  response1.requestId should ===(2L)
  response1.value should ===(Some(24.0))

  deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 3))

  deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
  val response2 = probe.expectMsgType[Device.RespondTemperature]
  response2.requestId should ===(4L)
  response2.value should ===(Some(55.0))
}
Java
source@Test
public void testReplyWithLatestTemperatureReading() {
  TestKit probe = new TestKit(system);
  ActorRef deviceActor = system.actorOf(Device.props("group", "device"));

  deviceActor.tell(new Device.RecordTemperature(1L, 24.0), probe.getRef());
  assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);

  deviceActor.tell(new Device.ReadTemperature(2L), probe.getRef());
  Device.RespondTemperature response1 = probe.expectMsgClass(Device.RespondTemperature.class);
  assertEquals(2L, response1.requestId);
  assertEquals(Optional.of(24.0), response1.value);

  deviceActor.tell(new Device.RecordTemperature(3L, 55.0), probe.getRef());
  assertEquals(3L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);

  deviceActor.tell(new Device.ReadTemperature(4L), probe.getRef());
  Device.RespondTemperature response2 = probe.expectMsgClass(Device.RespondTemperature.class);
  assertEquals(4L, response2.requestId);
  assertEquals(Optional.of(55.0), response2.value);
}

What’s Next?

So far, we have started designing our overall architecture, and we wrote the first actor that directly corresponds to the domain. We now have to create the component that is responsible for maintaining groups of devices and the device actors themselves.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.