Part 4: Working with Device Groups

Dependency

Add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-actor_2.12:2.5.32"
}

Introduction

Let’s take a closer look at the main functionality required by our use case. In a complete IoT system for monitoring home temperatures, the steps for connecting a device sensor to our system might look like this:

  1. A sensor device in the home connects through some protocol.
  2. The component managing network connections accepts the connection.
  3. The sensor provides its group and device ID to register with the device manager component of our system.
  4. The device manager component handles registration by looking up or creating the actor responsible for keeping sensor state.
  5. The actor responds with an acknowledgement, exposing its ActorRef.
  6. The networking component now uses the ActorRef for communication between the sensor and device actor without going through the device manager.

Steps 1 and 2 take place outside the boundaries of our tutorial system. In this chapter, we will start addressing steps 3-6 and create a way for sensors to register with our system and to communicate with actors. But first, we have another architectural decision — how many levels of actors should we use to represent device groups and device sensors?

One of the main design challenges for Akka programmers is choosing the best granularity for actors. In practice, depending on the characteristics of the interactions between actors, there are usually several valid ways to organize a system. In our use case, for example, it would be possible to have a single actor maintain all the groups and devices — perhaps using hash maps. It would also be reasonable to have an actor for each group that tracks the state of all devices in the same home.

The following guidelines help us choose the most appropriate actor hierarchy:

  • In general, prefer larger granularity. Introducing more fine-grained actors than needed causes more problems than it solves.
  • Add finer granularity when the system requires:
    • Higher concurrency.
    • Complex conversations between actors that have many states. We will see a very good example for this in the next chapter.
    • Sufficient state that it makes sense to divide into smaller actors.
    • Multiple unrelated responsibilities. Using separate actors allows individuals to fail and be restored with little impact on others.

Device manager hierarchy

Considering the principles outlined in the previous section, We will model the device manager component as an actor tree with three levels:

  • The top level supervisor actor represents the system component for devices. It is also the entry point to look up and create device group and device actors.
  • At the next level, group actors each supervise the device actors for one group id (e.g. one home). They also provide services, such as querying temperature readings from all of the available devices in their group.
  • Device actors manage all the interactions with the actual device sensors, such as storing temperature readings.

device manager tree

We chose this three-layered architecture for these reasons:

  • Having groups of individual actors:

    • Isolates failures that occur in a group. If a single actor managed all device groups, an error in one group that causes a restart would wipe out the state of groups that are otherwise non-faulty.
    • Simplifies the problem of querying all the devices belonging to a group. Each group actor only contains state related to its group.
    • Increases parallelism in the system. Since each group has a dedicated actor, they run concurrently and we can query multiple groups concurrently.
  • Having sensors modeled as individual device actors:

    • Isolates failures of one device actor from the rest of the devices in the group.
    • Increases the parallelism of collecting temperature readings. Network connections from different sensors communicate with their individual device actors directly, reducing contention points.

With the architecture defined, we can start working on the protocol for registering sensors.

The Registration Protocol

As the first step, we need to design the protocol both for registering a device and for creating the group and device actors that will be responsible for it. This protocol will be provided by the DeviceManager component itself because that is the only actor that is known and available up front: device groups and device actors are created on-demand.

Looking at registration in more detail, we can outline the necessary functionality:

  1. When a DeviceManager receives a request with a group and device id:
    • If the manager already has an actor for the device group, it forwards the request to it.
    • Otherwise, it creates a new device group actor and then forwards the request.
  2. The DeviceGroup actor receives the request to register an actor for the given device:
    • If the group already has an actor for the device, the group actor forwards the request to the device actor.
    • Otherwise, the DeviceGroup actor first creates a device actor and then forwards the request.
  3. The device actor receives the request and sends an acknowledgement to the original sender. Since the device actor acknowledges receipt (instead of the group actor), the sensor will now have the ActorRef to send messages directly to its actor.

The messages that we will use to communicate registration requests and their acknowledgement have a simple definition:

Scala
sourcefinal case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered
Java
sourcepublic static final class RequestTrackDevice {
  public final String groupId;
  public final String deviceId;

  public RequestTrackDevice(String groupId, String deviceId) {
    this.groupId = groupId;
    this.deviceId = deviceId;
  }
}

public static final class DeviceRegistered {}

In this case we have not included a request ID field in the messages. Since registration happens once, when the component connects the system to some network protocol, the ID is not important. However, it is usually a best practice to include a request ID.

Now, we’ll start implementing the protocol from the bottom up. In practice, both a top-down and bottom-up approach can work, but in our case, we benefit from the bottom-up approach as it allows us to immediately write tests for the new features without mocking out parts that we will need to build later.

Adding registration support to device actors

At the bottom of our hierarchy are the Device actors. Their job in the registration process is simple: reply to the registration request with an acknowledgment to the sender. It is also prudent to add a safeguard against requests that come with a mismatched group or device ID.

We will assume that the ID of the sender of the registration message is preserved in the upper layers. We will show you in the next section how this can be achieved.

The device actor registration code looks like the following. Modify your example to match.

Scala
sourceobject Device {
  def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))

  final case class RecordTemperature(requestId: Long, value: Double)
  final case class TemperatureRecorded(requestId: Long)

  final case class ReadTemperature(requestId: Long)
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
  import Device._

  var lastTemperatureReading: Option[Double] = None

  override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)

  override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)

  override def receive: Receive = {
    case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) =>
      sender() ! DeviceManager.DeviceRegistered

    case DeviceManager.RequestTrackDevice(groupId, deviceId) =>
      log.warning(
        "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
        groupId,
        deviceId,
        this.groupId,
        this.deviceId)

    case RecordTemperature(id, value) =>
      log.info("Recorded temperature reading {} with {}", value, id)
      lastTemperatureReading = Some(value)
      sender() ! TemperatureRecorded(id)

    case ReadTemperature(id) =>
      sender() ! RespondTemperature(id, lastTemperatureReading)
  }
}
Java
source
import akka.actor.AbstractActor; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import jdocs.tutorial_4.DeviceManager.DeviceRegistered; import jdocs.tutorial_4.DeviceManager.RequestTrackDevice; import java.util.Optional; public class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, () -> new Device(groupId, deviceId)); } public static final class RecordTemperature { final long requestId; final double value; public RecordTemperature(long requestId, double value) { this.requestId = requestId; this.value = value; } } public static final class TemperatureRecorded { final long requestId; public TemperatureRecorded(long requestId) { this.requestId = requestId; } } public static final class ReadTemperature { final long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match( RequestTrackDevice.class, r -> { if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { getSender().tell(new DeviceRegistered(), getSelf()); } else { log.warning( "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", r.groupId, r.deviceId, this.groupId, this.deviceId); } }) .match( RecordTemperature.class, r -> { log.info("Recorded temperature reading {} with {}", r.value, r.requestId); lastTemperatureReading = Optional.of(r.value); getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); }) .match( ReadTemperature.class, r -> { getSender() .tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }
Note

We used a feature of scala pattern matching where we can check to see if a certain field equals an expected value. By bracketing variables with backticks, like `variable`, the pattern will only match if it contains the value of variable in that position.

We can now write two new test cases, one exercising successful registration, the other testing the case when IDs don’t match:

Scala
source"reply to registration requests" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  probe.lastSender should ===(deviceActor)
}

"ignore wrong registration requests" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
  probe.expectNoMessage(500.milliseconds)

  deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
  probe.expectNoMessage(500.milliseconds)
}
Java
source@Test
public void testReplyToRegistrationRequests() {
  TestKit probe = new TestKit(system);
  ActorRef deviceActor = system.actorOf(Device.props("group", "device"));

  deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  assertEquals(deviceActor, probe.getLastSender());
}

@Test
public void testIgnoreWrongRegistrationRequests() {
  TestKit probe = new TestKit(system);
  ActorRef deviceActor = system.actorOf(Device.props("group", "device"));

  deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
  probe.expectNoMessage();

  deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
  probe.expectNoMessage();
}
Note

We used the expectNoMsg() helper method from TestProbeTestKit. This assertion waits until the defined time-limit and fails if it receives any messages during this period. If no messages are received during the waiting period, the assertion passes. It is usually a good idea to keep these timeouts low (but not too low) because they add significant test execution time.

Adding registration support to device group actors

We are done with registration support at the device level, now we have to implement it at the group level. A group actor has more work to do when it comes to registrations, including:

  • Handling the registration request by either forwarding it to an existing device actor or by creating a new actor and forwarding the message.
  • Tracking which device actors exist in the group and removing them from the group when they are stopped.

Handling the registration request

A device group actor must either forward the request to an existing child, or it should create one. To look up child actors by their device IDs we will use a Map[String, ActorRef]Map<String, ActorRef>.

We also want to keep the ID of the original sender of the request so that our device actor can reply directly. This is possible by using forward instead of the ! tell operator. The only difference between the two is that forward keeps the original sender while ! tell sets the sender to be the current actor. Just like with our device actor, we ensure that we don’t respond to wrong group IDs. Add the following to your source file:

Scala
sourceobject DeviceGroup {
  def props(groupId: String): Props = Props(new DeviceGroup(groupId))
}

class DeviceGroup(groupId: String) extends Actor with ActorLogging {
  var deviceIdToActor = Map.empty[String, ActorRef]

  override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)

  override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)

  override def receive: Receive = {
    case trackMsg @ RequestTrackDevice(`groupId`, _) =>
      deviceIdToActor.get(trackMsg.deviceId) match {
        case Some(deviceActor) =>
          deviceActor.forward(trackMsg)
        case None =>
          log.info("Creating device actor for {}", trackMsg.deviceId)
          val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
          deviceIdToActor += trackMsg.deviceId -> deviceActor
          deviceActor.forward(trackMsg)
      }

    case RequestTrackDevice(groupId, deviceId) =>
      log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId)
  }
}
Java
sourcepublic class DeviceGroup extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final String groupId;

  public DeviceGroup(String groupId) {
    this.groupId = groupId;
  }

  public static Props props(String groupId) {
    return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
  }

  final Map<String, ActorRef> deviceIdToActor = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceGroup {} started", groupId);
  }

  @Override
  public void postStop() {
    log.info("DeviceGroup {} stopped", groupId);
  }

  private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
    if (this.groupId.equals(trackMsg.groupId)) {
      ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
      if (deviceActor != null) {
        deviceActor.forward(trackMsg, getContext());
      } else {
        log.info("Creating device actor for {}", trackMsg.deviceId);
        deviceActor =
            getContext()
                .actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
        deviceIdToActor.put(trackMsg.deviceId, deviceActor);
        deviceActor.forward(trackMsg, getContext());
      }
    } else {
      log.warning(
          "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
          groupId,
          this.groupId);
    }
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
        .build();
  }
}

Just as we did with the device, we test this new functionality. We also test that the actors returned for the two different IDs are actually different, and we also attempt to record a temperature reading for each of the devices to see if the actors are responding.

Scala
source"be able to register a device actor" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor1 = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor2 = probe.lastSender
  deviceActor1 should !==(deviceActor2)

  // Check that the device actors are working
  deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
  deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
}

"ignore requests for wrong groupId" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
  probe.expectNoMessage(500.milliseconds)
}
Java
source@Test
public void testRegisterDeviceActor() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor1 = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor2 = probe.getLastSender();
  assertNotEquals(deviceActor1, deviceActor2);

  // Check that the device actors are working
  deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
  assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
  deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
  assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
}

@Test
public void testIgnoreRequestsForWrongGroupId() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
  probe.expectNoMessage();
}

If a device actor already exists for the registration request, we would like to use the existing actor instead of a new one. We have not tested this yet, so we need to fix this:

Scala
source"return same actor for same deviceId" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor1 = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor2 = probe.lastSender

  deviceActor1 should ===(deviceActor2)
}
Java
source@Test
public void testReturnSameActorForSameDeviceId() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor1 = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor2 = probe.getLastSender();
  assertEquals(deviceActor1, deviceActor2);
}

Keeping track of the device actors in the group

So far, we have implemented logic for registering device actors in the group. Devices come and go, however, so we will need a way to remove device actors from the Map[String, ActorRef] Map<String, ActorRef>. We will assume that when a device is removed, its corresponding device actor is stopped. Supervision, as we discussed earlier, only handles error scenarios — not graceful stopping. So we need to notify the parent when one of the device actors is stopped.

Akka provides a Death Watch feature that allows an actor to watch another actor and be notified if the other actor is stopped. Unlike supervision, watching is not limited to parent-child relationships, any actor can watch any other actor as long as it knows the ActorRef. After a watched actor stops, the watcher receives a Terminated(actorRef) message which also contains the reference to the watched actor. The watcher can either handle this message explicitly or will fail with a DeathPactException. This latter is useful if the actor can no longer perform its own duties after the watched actor has been stopped. In our case, the group should still function after one device have been stopped, so we need to handle the Terminated(actorRef) message.

Our device group actor needs to include functionality that:

  1. Starts watching new device actors when they are created.
  2. Removes a device actor from the Map[String, ActorRef] Map<String, ActorRef> — which maps devices to device actors — when the notification indicates it has stopped.

Unfortunately, the Terminated message only contains the ActorRef of the child actor. We need the actor’s ID to remove it from the map of existing device to device actor mappings. To be able to do this removal, we need to introduce another placeholder, Map[ActorRef, String] Map<ActorRef, String>, that allow us to find out the device ID corresponding to a given ActorRef.

Adding the functionality to identify the actor results in this:

Scala
source
class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] var actorToDeviceId = Map.empty[ActorRef, String] override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { case trackMsg @ RequestTrackDevice(`groupId`, _) => deviceIdToActor.get(trackMsg.deviceId) match { case Some(deviceActor) => deviceActor.forward(trackMsg) case None => log.info("Creating device actor for {}", trackMsg.deviceId) val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") context.watch(deviceActor) actorToDeviceId += deviceActor -> trackMsg.deviceId deviceIdToActor += trackMsg.deviceId -> deviceActor deviceActor.forward(trackMsg) } case RequestTrackDevice(groupId, deviceId) => log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId) case Terminated(deviceActor) => val deviceId = actorToDeviceId(deviceActor) log.info("Device actor for {} has been terminated", deviceId) actorToDeviceId -= deviceActor deviceIdToActor -= deviceId } }
Java
sourcepublic class DeviceGroup extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final String groupId;

  public DeviceGroup(String groupId) {
    this.groupId = groupId;
  }

  public static Props props(String groupId) {
    return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
  }

  final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  final Map<ActorRef, String> actorToDeviceId = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceGroup {} started", groupId);
  }

  @Override
  public void postStop() {
    log.info("DeviceGroup {} stopped", groupId);
  }

  private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
    if (this.groupId.equals(trackMsg.groupId)) {
      ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
      if (deviceActor != null) {
        deviceActor.forward(trackMsg, getContext());
      } else {
        log.info("Creating device actor for {}", trackMsg.deviceId);
        deviceActor =
            getContext()
                .actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
        getContext().watch(deviceActor);
        actorToDeviceId.put(deviceActor, trackMsg.deviceId);
        deviceIdToActor.put(trackMsg.deviceId, deviceActor);
        deviceActor.forward(trackMsg, getContext());
      }
    } else {
      log.warning(
          "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
          groupId,
          this.groupId);
    }
  }

  private void onTerminated(Terminated t) {
    ActorRef deviceActor = t.getActor();
    String deviceId = actorToDeviceId.get(deviceActor);
    log.info("Device actor for {} has been terminated", deviceId);
    actorToDeviceId.remove(deviceActor);
    deviceIdToActor.remove(deviceId);
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
        .match(Terminated.class, this::onTerminated)
        .build();
  }
}

So far we have no means to get which devices the group device actor keeps track of and, therefore, we cannot test our new functionality yet. To make it testable, we add a new query capability (message RequestDeviceList(requestId: Long) RequestDeviceList) that lists the currently active device IDs:

Scala
sourceobject DeviceGroup {
  def props(groupId: String): Props = Props(new DeviceGroup(groupId))

  final case class RequestDeviceList(requestId: Long)
  final case class ReplyDeviceList(requestId: Long, ids: Set[String])
}

class DeviceGroup(groupId: String) extends Actor with ActorLogging {
  var deviceIdToActor = Map.empty[String, ActorRef]
  var actorToDeviceId = Map.empty[ActorRef, String]

  override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)

  override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)

  override def receive: Receive = {
    case trackMsg @ RequestTrackDevice(`groupId`, _) =>
      deviceIdToActor.get(trackMsg.deviceId) match {
        case Some(deviceActor) =>
          deviceActor.forward(trackMsg)
        case None =>
          log.info("Creating device actor for {}", trackMsg.deviceId)
          val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
          context.watch(deviceActor)
          actorToDeviceId += deviceActor -> trackMsg.deviceId
          deviceIdToActor += trackMsg.deviceId -> deviceActor
          deviceActor.forward(trackMsg)
      }

    case RequestTrackDevice(groupId, deviceId) =>
      log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId)

    case RequestDeviceList(requestId) =>
      sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)

    case Terminated(deviceActor) =>
      val deviceId = actorToDeviceId(deviceActor)
      log.info("Device actor for {} has been terminated", deviceId)
      actorToDeviceId -= deviceActor
      deviceIdToActor -= deviceId

  }
}
Java
sourcepublic class DeviceGroup extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final String groupId;

  public DeviceGroup(String groupId) {
    this.groupId = groupId;
  }

  public static Props props(String groupId) {
    return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId));
  }

  public static final class RequestDeviceList {
    final long requestId;

    public RequestDeviceList(long requestId) {
      this.requestId = requestId;
    }
  }

  public static final class ReplyDeviceList {
    final long requestId;
    final Set<String> ids;

    public ReplyDeviceList(long requestId, Set<String> ids) {
      this.requestId = requestId;
      this.ids = ids;
    }
  }

  final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  final Map<ActorRef, String> actorToDeviceId = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceGroup {} started", groupId);
  }

  @Override
  public void postStop() {
    log.info("DeviceGroup {} stopped", groupId);
  }

  private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
    if (this.groupId.equals(trackMsg.groupId)) {
      ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
      if (deviceActor != null) {
        deviceActor.forward(trackMsg, getContext());
      } else {
        log.info("Creating device actor for {}", trackMsg.deviceId);
        deviceActor =
            getContext()
                .actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
        getContext().watch(deviceActor);
        actorToDeviceId.put(deviceActor, trackMsg.deviceId);
        deviceIdToActor.put(trackMsg.deviceId, deviceActor);
        deviceActor.forward(trackMsg, getContext());
      }
    } else {
      log.warning(
          "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
          groupId,
          this.groupId);
    }
  }

  private void onDeviceList(RequestDeviceList r) {
    getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
  }

  private void onTerminated(Terminated t) {
    ActorRef deviceActor = t.getActor();
    String deviceId = actorToDeviceId.get(deviceActor);
    log.info("Device actor for {} has been terminated", deviceId);
    actorToDeviceId.remove(deviceActor);
    deviceIdToActor.remove(deviceId);
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
        .match(RequestDeviceList.class, this::onDeviceList)
        .match(Terminated.class, this::onTerminated)
        .build();
  }
}

We are almost ready to test the removal of devices. But, we still need the following capabilities:

  • To stop a device actor from our test case. From the outside, any actor can be stopped by sending a special the built-in message, PoisonPill, which instructs the actor to stop.
  • To be notified once the device actor is stopped. We can use the Death Watch facility for this purpose, too. The TestProbe TestKit has two messages that we can easily use, watch() to watch a specific actor, and expectTerminated to assert that the watched actor has been terminated.

We add two more test cases now. In the first, we test that we get back the list of proper IDs once we have added a few devices. The second test case makes sure that the device ID is properly removed after the device actor has been stopped:

Scala
source"be able to list active devices" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)

  groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
  probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
}

"be able to list active devices after one shuts down" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val toShutDown = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)

  groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
  probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))

  probe.watch(toShutDown)
  toShutDown ! PoisonPill
  probe.expectTerminated(toShutDown)

  // using awaitAssert to retry because it might take longer for the groupActor
  // to see the Terminated, that order is undefined
  probe.awaitAssert {
    groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
    probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
  }
}
Java
source@Test
public void testListActiveDevices() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);

  groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
  DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  assertEquals(0L, reply.requestId);
  assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
}

@Test
public void testListActiveDevicesAfterOneShutsDown() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef toShutDown = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);

  groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
  DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  assertEquals(0L, reply.requestId);
  assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);

  probe.watch(toShutDown);
  toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
  probe.expectTerminated(toShutDown);

  // using awaitAssert to retry because it might take longer for the groupActor
  // to see the Terminated, that order is undefined
  probe.awaitAssert(
      () -> {
        groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
        DeviceGroup.ReplyDeviceList r = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
        assertEquals(1L, r.requestId);
        assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
        return null;
      });
}

Creating device manager actors

Going up to the next level in our hierarchy, we need to create the entry point for our device manager component in the DeviceManager source file. This actor is very similar to the device group actor, but creates device group actors instead of device actors:

Scala
sourceobject DeviceManager {
  def props(): Props = Props(new DeviceManager)

  final case class RequestTrackDevice(groupId: String, deviceId: String)
  case object DeviceRegistered
}

class DeviceManager extends Actor with ActorLogging {
  var groupIdToActor = Map.empty[String, ActorRef]
  var actorToGroupId = Map.empty[ActorRef, String]

  override def preStart(): Unit = log.info("DeviceManager started")

  override def postStop(): Unit = log.info("DeviceManager stopped")

  override def receive = {
    case trackMsg @ RequestTrackDevice(groupId, _) =>
      groupIdToActor.get(groupId) match {
        case Some(ref) =>
          ref.forward(trackMsg)
        case None =>
          log.info("Creating device group actor for {}", groupId)
          val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
          context.watch(groupActor)
          groupActor.forward(trackMsg)
          groupIdToActor += groupId -> groupActor
          actorToGroupId += groupActor -> groupId
      }

    case Terminated(groupActor) =>
      val groupId = actorToGroupId(groupActor)
      log.info("Device group actor for {} has been terminated", groupId)
      actorToGroupId -= groupActor
      groupIdToActor -= groupId

  }

}
Java
sourcepublic class DeviceManager extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  public static Props props() {
    return Props.create(DeviceManager.class, DeviceManager::new);
  }

  public static final class RequestTrackDevice {
    public final String groupId;
    public final String deviceId;

    public RequestTrackDevice(String groupId, String deviceId) {
      this.groupId = groupId;
      this.deviceId = deviceId;
    }
  }

  public static final class DeviceRegistered {}

  final Map<String, ActorRef> groupIdToActor = new HashMap<>();
  final Map<ActorRef, String> actorToGroupId = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceManager started");
  }

  @Override
  public void postStop() {
    log.info("DeviceManager stopped");
  }

  private void onTrackDevice(RequestTrackDevice trackMsg) {
    String groupId = trackMsg.groupId;
    ActorRef ref = groupIdToActor.get(groupId);
    if (ref != null) {
      ref.forward(trackMsg, getContext());
    } else {
      log.info("Creating device group actor for {}", groupId);
      ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
      getContext().watch(groupActor);
      groupActor.forward(trackMsg, getContext());
      groupIdToActor.put(groupId, groupActor);
      actorToGroupId.put(groupActor, groupId);
    }
  }

  private void onTerminated(Terminated t) {
    ActorRef groupActor = t.getActor();
    String groupId = actorToGroupId.get(groupActor);
    log.info("Device group actor for {} has been terminated", groupId);
    actorToGroupId.remove(groupActor);
    groupIdToActor.remove(groupId);
  }

  public Receive createReceive() {
    return receiveBuilder()
        .match(RequestTrackDevice.class, this::onTrackDevice)
        .match(Terminated.class, this::onTerminated)
        .build();
  }
}

We leave tests of the device manager as an exercise for you since it is very similar to the tests we have already written for the group actor.

What’s next?

We have now a hierarchical component for registering and tracking devices and recording measurements. We have seen how to implement different types of conversation patterns, such as:

  • Request-respond (for temperature recordings)
  • Delegate-respond (for registration of devices)
  • Create-watch-terminate (for creating the group and device actor as children)

In the next chapter, we will introduce group query capabilities, which will establish a new conversation pattern of scatter-gather. In particular, we will implement the functionality that allows users to query the status of all the devices belonging to a group.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.