Part 3: Device Groups and Manager

In this chapter, we will integrate our device actors into a component that manages devices. When a new device comes online, there is no actor representing it. We need to be able to ask the device manager component to create a new device actor for us if necessary, in the required group (or return a reference to an already existing one).

Since we keep our tutorial system to the bare minimum, we have no actual component that interfaces with the external world via some networking protocol. For our exercise, we will just create the API necessary to integrate with such a component in the future. In a final system, the steps for connecting a device would look like this:

  1. The device connects through some protocol to our system.
  2. The component managing network connections accept the connection.
  3. The ID of the device and the ID of the group that it belongs is acquired.
  4. The device manager component is asked to create a group and device actor for the given IDs (or return an existing one).
  5. The device actor (just been created or located) responds with an acknowledgment, at the same time exposing its ActorRef directly (by being the sender of the acknowledgment).
  6. The networking component now uses the ActorRef of the device directly, avoiding going through the component.

We are only concerned with steps 4 and 5 now. We will model the device manager component as an actor tree with three levels:

device manager tree

  • The top level is the supervisor actor representing the component. It is also the entry point to look up or create group and device actors.
  • Device group actors are supervisors of the devices belonging to the group. Apart from supervising the device actors they also provide extra services, like querying the temperature readings from all the devices available.
  • Device actors manage all the interactions with the actual devices, storing temperature readings for example.

When designing actor systems one of the main challenges is to decide on the granularity of the actors. For example, it would be perfectly possible to have only a single actor maintaining all the groups and devices in HashMaps for example. It would be also reasonable to keep the groups as separate actors, but keep device state simply inside the group actor.

We chose this three-layered architecture for the following reasons:

  • Having groups as individual actors:
  • Allows us to isolate failures happening in a group. If a programmer error would happen in the single actor that keeps all state, it would be all wiped out once that actor is restarted affecting groups that are otherwise non-faulty.
  • Simplifies the problem of querying all the devices belonging to a group (since it only contains state related to the given group).
  • Increases the parallelism of the system by allowing to query multiple groups concurrently. Since groups have dedicated actors, all of them can run concurrently.
  • Having devices as individual actors:
  • Allows us to isolate failures happening in a device actor from the rest of the devices.
  • Increases the parallelism of collecting temperature readings as actual network connections from different devices can talk to the individual device actors directly, reducing contention points.

In practice, a system can be organized in multiple ways, all depending on the characteristics of the interactions between actors.

The following guidelines help to arrive at the right granularity:

  • Prefer larger granularity to smaller. Introducing more fine-grained actors than needed causes more problems than it solves.
  • Prefer finer granularity if it enables higher concurrency in the system.
  • Prefer finer granularity if actors need to handle complex conversations with other actors and hence have many states. We will see a very good example for this in the next chapter.
  • Prefer finer granularity if there is too much state to keep around in one place compared to dividing into smaller actors.
  • Prefer finer granularity if the current actor has multiple unrelated responsibilities that can fail and restored individually.

The Registration Protocol

As the first step, we need to design the protocol for registering a device and create an actor that will be responsible for it. This protocol will be provided by the DeviceManager component itself because that is the only actor that is known up front: device groups and device actors are created on-demand. The steps of registering a device are the following:

  1. DeviceManager receives the request to track a device for a given group and device.
  2. If the manager already has an actor for the device group, it forwards the request to it. Otherwise, it first creates a new one and then forwards the request.
  3. The DeviceGroup receives the request to register an actor for the given device.
  4. If the group already has an actor for the device, it forwards the request to it. Otherwise, it first creates a new one and then forwards the request.
  5. The device actor receives the request and acknowledges it to the original sender. Since the device actor is the sender of the acknowledgment, the receiver, i.e. the device, will be able to learn its ActorRef and send direct messages to its device actor in the future.

Now that the steps are defined, we only need to define the messages that we will use to communicate requests and their acknowledgement:

final case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered

As you see, in this case, we have not included a request ID field in the messages. Since registration is usually happening once, at the component that connects the system to some network protocol, we will usually have no use for the ID. Nevertheless, it is a good exercise to add this ID.

Add Registration Support to Device Actor

We start implementing the protocol from the bottom first. In practice, both a top-down and bottom-up approach can work, but in our case, we benefit from the bottom-up approach as it allows us to immediately write tests for the new features without mocking out parts.

At the bottom of our hierarchy are the Device actors. Their job in this registration process is rather simple, just reply to the registration request with an acknowledgment to the sender. We will assume that the sender of the registration message is preserved in the upper layers. We will show you in the next section how this can be achieved.

We also add a safeguard against requests that come with a mismatched group or device ID. This is how the resulting the code looks like:

Note

We used a feature of scala pattern matching where we can match if a certain field equals to an expected value. This is achieved by variables included in backticks, like `variable`, and it means that the pattern only match if it contains the value of variable in that position.

Scala
object Device {
  def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))

  final case class RecordTemperature(requestId: Long, value: Double)
  final case class TemperatureRecorded(requestId: Long)

  final case class ReadTemperature(requestId: Long)
  final case class RespondTemperature(requestId: Long, value: Option[Double])
}

class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
  var lastTemperatureReading: Option[Double] = None

  override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)

  override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)

  override def receive: Receive = {
    case RequestTrackDevice(`groupId`, `deviceId`) =>
      sender() ! DeviceRegistered

    case RequestTrackDevice(groupId, deviceId) =>
      log.warning(
        "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
        groupId, deviceId, this.groupId, this.deviceId
      )

    case RecordTemperature(id, value) =>
      log.info("Recorded temperature reading {} with {}", value, id)
      lastTemperatureReading = Some(value)
      sender() ! TemperatureRecorded(id)

    case ReadTemperature(id) =>
      sender() ! RespondTemperature(id, lastTemperatureReading)
  }
}
Java

import akka.actor.AbstractActor; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import jdocs.tutorial_3.DeviceManager.DeviceRegistered; import jdocs.tutorial_3.DeviceManager.RequestTrackDevice; import java.util.Optional; public class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, groupId, deviceId); } public static final class RecordTemperature { final long requestId; final double value; public RecordTemperature(long requestId, double value) { this.requestId = requestId; this.value = value; } } public static final class TemperatureRecorded { final long requestId; public TemperatureRecorded(long requestId) { this.requestId = requestId; } } public static final class ReadTemperature { final long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match(RequestTrackDevice.class, r -> { if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { getSender().tell(new DeviceRegistered(), getSelf()); } else { log.warning( "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", r.groupId, r.deviceId, this.groupId, this.deviceId ); } }) .match(RecordTemperature.class, r -> { log.info("Recorded temperature reading {} with {}", r.value, r.requestId); lastTemperatureReading = Optional.of(r.value); getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); }) .match(ReadTemperature.class, r -> { getSender().tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }

We should not leave features untested, so we immediately write two new test cases, one exercising successful registration, the other testing the case when IDs don’t match:

Note

We used the expectNoMsg() helper method from TestProbe TestKit. This assertion waits until the defined time-limit and fails if it receives any messages during this period. If no messages are received during the waiting period the assertion passes. It is usually a good idea to keep these timeouts low (but not too low) because they add significant test execution time otherwise.

Scala
"reply to registration requests" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  probe.lastSender should ===(deviceActor)
}

"ignore wrong registration requests" in {
  val probe = TestProbe()
  val deviceActor = system.actorOf(Device.props("group", "device"))

  deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
  probe.expectNoMsg(500.milliseconds)

  deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
  probe.expectNoMsg(500.milliseconds)
}
Java
@Test
public void testReplyToRegistrationRequests() {
  TestKit probe = new TestKit(system);
  ActorRef deviceActor = system.actorOf(Device.props("group", "device"));

  deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  assertEquals(deviceActor, probe.getLastSender());
}

@Test
public void testIgnoreWrongRegistrationRequests() {
  TestKit probe = new TestKit(system);
  ActorRef deviceActor = system.actorOf(Device.props("group", "device"));

  deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef());
  probe.expectNoMsg();

  deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef());
  probe.expectNoMsg();
}

Device Group

We are done with the registration support at the device level, now we have to implement it at the group level. A group has more work to do when it comes to registrations. It must either forward the request to an existing child, or it should create one. To be able to look up child actors by their device IDs we will use a Map[String, ActorRef] Map<String, ActorRef>.

We also want to keep the original sender of the request so that our device actor can reply directly. This is possible by using forward instead of the ! tell operator. The only difference between the two is that forward keeps the original sender while ! tell always sets the sender to be the current actor. Just like with our device actor, we ensure that we don’t respond to wrong group IDs:

Scala
object DeviceGroup {
  def props(groupId: String): Props = Props(new DeviceGroup(groupId))
}

class DeviceGroup(groupId: String) extends Actor with ActorLogging {
  var deviceIdToActor = Map.empty[String, ActorRef]

  override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)

  override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)

  override def receive: Receive = {
    case trackMsg @ RequestTrackDevice(`groupId`, _) =>
      deviceIdToActor.get(trackMsg.deviceId) match {
        case Some(deviceActor) =>
          deviceActor forward trackMsg
        case None =>
          log.info("Creating device actor for {}", trackMsg.deviceId)
          val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
          deviceIdToActor += trackMsg.deviceId -> deviceActor
          deviceActor forward trackMsg
      }

    case RequestTrackDevice(groupId, deviceId) =>
      log.warning(
        "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
        groupId, this.groupId
      )
  }
}
Java
  public static Props props(String groupId) {
    return Props.create(DeviceGroup.class, groupId);
  }

  final Map<String, ActorRef> deviceIdToActor = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceGroup {} started", groupId);
  }

  @Override
  public void postStop() {
    log.info("DeviceGroup {} stopped", groupId);
  }

  private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
    if (this.groupId.equals(trackMsg.groupId)) {
      ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
      if (deviceActor != null) {
        deviceActor.forward(trackMsg, getContext());
      } else {
        log.info("Creating device actor for {}", trackMsg.deviceId);
        deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
        deviceIdToActor.put(trackMsg.deviceId, deviceActor);
        deviceActor.forward(trackMsg, getContext());
      }
    } else {
      log.warning(
              "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
              groupId, this.groupId
      );
    }
  }

  private void onDeviceList(RequestDeviceList r) {
    getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
  }

  private void onTerminated(Terminated t) {
    ActorRef deviceActor = t.getActor();
    String deviceId = actorToDeviceId.get(deviceActor);
    log.info("Device actor for {} has been terminated", deviceId);
    actorToDeviceId.remove(deviceActor);
    deviceIdToActor.remove(deviceId);
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
            .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
            .match(RequestDeviceList.class, this::onDeviceList)
            .match(Terminated.class, this::onTerminated)
            .build();
  }
}

Just as we did with the device, we test this new functionality. We also test that the actors returned for the two different IDs are actually different, and we also attempt to record a temperature reading for each of the devices to see if the actors are responding.

Scala
"be able to register a device actor" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor1 = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor2 = probe.lastSender
  deviceActor1 should !==(deviceActor2)

  // Check that the device actors are working
  deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
  deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
}

"ignore requests for wrong groupId" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
  probe.expectNoMsg(500.milliseconds)
}
Java
@Test
public void testRegisterDeviceActor() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor1 = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor2 = probe.getLastSender();
  assertNotEquals(deviceActor1, deviceActor2);

  // Check that the device actors are workingl
  deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
  assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
  deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
  assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
}

@Test
public void testIgnoreRequestsForWrongGroupId() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef());
  probe.expectNoMsg();
}

It might be, that a device actor already exists for the registration request. In this case, we would like to use the existing actor instead of a new one. We have not tested this yet, so we need to fix this:

Scala
"return same actor for same deviceId" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor1 = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor2 = probe.lastSender

  deviceActor1 should ===(deviceActor2)
}
Java
@Test
public void testReturnSameActorForSameDeviceId() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor1 = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor2 = probe.getLastSender();
  assertEquals(deviceActor1, deviceActor2);
}

So far, we have implemented everything for registering device actors in the group. Devices come and go, however, so we will need a way to remove those from the Map[String, ActorRef] Map<String, ActorRef>. We will assume that when a device is removed, its corresponding device actor is simply stopped. We need some way for the parent to be notified when one of the device actors are stopped. Unfortunately, supervision will not help because it is used for error scenarios, not graceful stopping.

There is a feature in Akka that is exactly what we need here. It is possible for an actor to watch another actor and be notified if the other actor is stopped. This feature is called Death Watch and it is an important tool for any Akka application. Unlike supervision, watching is not limited to parent-child relationships, any actor can watch any other actor given its ActorRef. After a watched actor stops, the watcher receives a Terminated(ref) message which also contains the reference to the watched actor. The watcher can either handle this message explicitly or, if it does not handle it directly it will fail with a DeathPactException. This latter is useful if the actor cannot longer perform its duties after its collaborator actor has been stopped. In our case, the group should still function after one device have been stopped, so we need to handle this message. The steps we need to follow are the following:

  1. Whenever we create a new device actor, we must also watch it.
  2. When we are notified that a device actor has been stopped we also need to remove it from the Map[String, ActorRef] Map<String, ActorRef> which maps devices to device actors.

Unfortunately, the Terminated message contains only contains the ActorRef of the child actor but we do not know its ID, which we need to remove it from the map of existing device to device actor mappings. To be able to do this removal, we need to introduce another placeholder, Map[String, ActorRef] Map<String, ActorRef>, that allow us to find out the device ID corresponding to a given ActorRef. Putting this together the result is:

Scala

class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] var actorToDeviceId = Map.empty[ActorRef, String] override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { case trackMsg @ RequestTrackDevice(`groupId`, _) => deviceIdToActor.get(trackMsg.deviceId) match { case Some(deviceActor) => deviceActor forward trackMsg case None => log.info("Creating device actor for {}", trackMsg.deviceId) val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") context.watch(deviceActor) actorToDeviceId += deviceActor -> trackMsg.deviceId deviceIdToActor += trackMsg.deviceId -> deviceActor deviceActor forward trackMsg } case RequestTrackDevice(groupId, deviceId) => log.warning( "Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId ) case Terminated(deviceActor) => val deviceId = actorToDeviceId(deviceActor) log.info("Device actor for {} has been terminated", deviceId) actorToDeviceId -= deviceActor deviceIdToActor -= deviceId } }
Java

final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); @Override public void preStart() { log.info("DeviceGroup {} started", groupId); } @Override public void postStop() { log.info("DeviceGroup {} stopped", groupId); } private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { if (this.groupId.equals(trackMsg.groupId)) { ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); if (deviceActor != null) { deviceActor.forward(trackMsg, getContext()); } else { log.info("Creating device actor for {}", trackMsg.deviceId); deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); getContext().watch(deviceActor); actorToDeviceId.put(deviceActor, trackMsg.deviceId); deviceIdToActor.put(trackMsg.deviceId, deviceActor); deviceActor.forward(trackMsg, getContext()); } } else { log.warning( "Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId ); } } private void onDeviceList(RequestDeviceList r) { getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); } private void onTerminated(Terminated t) { ActorRef deviceActor = t.getActor(); String deviceId = actorToDeviceId.get(deviceActor); log.info("Device actor for {} has been terminated", deviceId); actorToDeviceId.remove(deviceActor); deviceIdToActor.remove(deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) .match(RequestDeviceList.class, this::onDeviceList) .match(Terminated.class, this::onTerminated) .build(); } }

So far we have no means to get what devices the group device actor keeps track of and, therefore, we cannot test our new functionality yet. To make it testable, we add a new query capability (message RequestDeviceList(requestId: Long) RequestDeviceList) that simply lists the currently active device IDs:

Scala
object DeviceGroup {
  def props(groupId: String): Props = Props(new DeviceGroup(groupId))

  final case class RequestDeviceList(requestId: Long)
  final case class ReplyDeviceList(requestId: Long, ids: Set[String])
}

class DeviceGroup(groupId: String) extends Actor with ActorLogging {
  var deviceIdToActor = Map.empty[String, ActorRef]
  var actorToDeviceId = Map.empty[ActorRef, String]

  override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)

  override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)

  override def receive: Receive = {
    case trackMsg @ RequestTrackDevice(`groupId`, _) =>
      deviceIdToActor.get(trackMsg.deviceId) match {
        case Some(deviceActor) =>
          deviceActor forward trackMsg
        case None =>
          log.info("Creating device actor for {}", trackMsg.deviceId)
          val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
          context.watch(deviceActor)
          actorToDeviceId += deviceActor -> trackMsg.deviceId
          deviceIdToActor += trackMsg.deviceId -> deviceActor
          deviceActor forward trackMsg
      }

    case RequestTrackDevice(groupId, deviceId) =>
      log.warning(
        "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
        groupId, this.groupId
      )

    case RequestDeviceList(requestId) =>
      sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)

    case Terminated(deviceActor) =>
      val deviceId = actorToDeviceId(deviceActor)
      log.info("Device actor for {} has been terminated", deviceId)
      actorToDeviceId -= deviceActor
      deviceIdToActor -= deviceId

  }
}
Java
public class DeviceGroup extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final String groupId;

  public DeviceGroup(String groupId) {
    this.groupId = groupId;
  }

  public static Props props(String groupId) {
    return Props.create(DeviceGroup.class, groupId);
  }

  public static final class RequestDeviceList {
    final long requestId;

    public RequestDeviceList(long requestId) {
      this.requestId = requestId;
    }
  }

  public static final class ReplyDeviceList {
    final long requestId;
    final Set<String> ids;

    public ReplyDeviceList(long requestId, Set<String> ids) {
      this.requestId = requestId;
      this.ids = ids;
    }
  }

  final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  final Map<ActorRef, String> actorToDeviceId = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceGroup {} started", groupId);
  }

  @Override
  public void postStop() {
    log.info("DeviceGroup {} stopped", groupId);
  }

  private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) {
    if (this.groupId.equals(trackMsg.groupId)) {
      ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId);
      if (deviceActor != null) {
        deviceActor.forward(trackMsg, getContext());
      } else {
        log.info("Creating device actor for {}", trackMsg.deviceId);
        deviceActor = getContext().actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId);
        getContext().watch(deviceActor);
        actorToDeviceId.put(deviceActor, trackMsg.deviceId);
        deviceIdToActor.put(trackMsg.deviceId, deviceActor);
        deviceActor.forward(trackMsg, getContext());
      }
    } else {
      log.warning(
              "Ignoring TrackDevice request for {}. This actor is responsible for {}.",
              groupId, this.groupId
      );
    }
  }

  private void onDeviceList(RequestDeviceList r) {
    getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf());
  }

  private void onTerminated(Terminated t) {
    ActorRef deviceActor = t.getActor();
    String deviceId = actorToDeviceId.get(deviceActor);
    log.info("Device actor for {} has been terminated", deviceId);
    actorToDeviceId.remove(deviceActor);
    deviceIdToActor.remove(deviceId);
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
            .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice)
            .match(RequestDeviceList.class, this::onDeviceList)
            .match(Terminated.class, this::onTerminated)
            .build();
  }
}

We almost have everything to test the removal of devices. What is missing is:

  • Stopping a device actor from our test case, from the outside: any actor can be stopped by simply sending a special the built-in message, PoisonPill, which instructs the actor to stop.
  • Be notified once the device actor is stopped: we can use the Death Watch facility for this purpose, too. Thankfully the TestProbe TestKit has two messages that we can easily use, watch() to watch a specific actor, and expectTerminated to assert that the watched actor has been terminated.

We add two more test cases now. In the first, we just test that we get back the list of proper IDs once we have added a few devices. The second test case makes sure that the device ID is properly removed after the device actor has been stopped:

Scala
"be able to list active devices" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)

  groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
  probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
}

"be able to list active devices after one shuts down" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val toShutDown = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)

  groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
  probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))

  probe.watch(toShutDown)
  toShutDown ! PoisonPill
  probe.expectTerminated(toShutDown)

  // using awaitAssert to retry because it might take longer for the groupActor
  // to see the Terminated, that order is undefined
  probe.awaitAssert {
    groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
    probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
  }
}
Java
@Test
public void testListActiveDevices() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);

  groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
  DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  assertEquals(0L, reply.requestId);
  assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);
}

@Test
public void testListActiveDevicesAfterOneShutsDown() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef toShutDown = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);

  groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef());
  DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
  assertEquals(0L, reply.requestId);
  assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids);

  probe.watch(toShutDown);
  toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender());
  probe.expectTerminated(toShutDown);

  // using awaitAssert to retry because it might take longer for the groupActor
  // to see the Terminated, that order is undefined
  probe.awaitAssert(() -> {
    groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef());
    DeviceGroup.ReplyDeviceList r = 
      probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class);
    assertEquals(1L, r.requestId);
    assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids);
    return null;
  });
}

Device Manager

The only part that remains now is the entry point for our device manager component. This actor is very similar to the device group actor, with the only difference that it creates device group actors instead of device actors:

Scala
object DeviceManager {
  def props(): Props = Props(new DeviceManager)

  final case class RequestTrackDevice(groupId: String, deviceId: String)
  case object DeviceRegistered
}

class DeviceManager extends Actor with ActorLogging {
  var groupIdToActor = Map.empty[String, ActorRef]
  var actorToGroupId = Map.empty[ActorRef, String]

  override def preStart(): Unit = log.info("DeviceManager started")

  override def postStop(): Unit = log.info("DeviceManager stopped")

  override def receive = {
    case trackMsg @ RequestTrackDevice(groupId, _) =>
      groupIdToActor.get(groupId) match {
        case Some(ref) =>
          ref forward trackMsg
        case None =>
          log.info("Creating device group actor for {}", groupId)
          val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
          context.watch(groupActor)
          groupActor forward trackMsg
          groupIdToActor += groupId -> groupActor
          actorToGroupId += groupActor -> groupId
      }

    case Terminated(groupActor) =>
      val groupId = actorToGroupId(groupActor)
      log.info("Device group actor for {} has been terminated", groupId)
      actorToGroupId -= groupActor
      groupIdToActor -= groupId

  }

}
Java
public class DeviceManager extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  public static Props props() {
    return Props.create(DeviceManager.class);
  }

  public static final class RequestTrackDevice {
    public final String groupId;
    public final String deviceId;

    public RequestTrackDevice(String groupId, String deviceId) {
      this.groupId = groupId;
      this.deviceId = deviceId;
    }
  }

  public static final class DeviceRegistered {
  }

  final Map<String, ActorRef> groupIdToActor = new HashMap<>();
  final Map<ActorRef, String> actorToGroupId = new HashMap<>();

  @Override
  public void preStart() {
    log.info("DeviceManager started");
  }

  @Override
  public void postStop() {
    log.info("DeviceManager stopped");
  }

  private void onTrackDevice(RequestTrackDevice trackMsg) {
    String groupId = trackMsg.groupId;
    ActorRef ref = groupIdToActor.get(groupId);
    if (ref != null) {
      ref.forward(trackMsg, getContext());
    } else {
      log.info("Creating device group actor for {}", groupId);
      ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId);
      getContext().watch(groupActor);
      groupActor.forward(trackMsg, getContext());
      groupIdToActor.put(groupId, groupActor);
      actorToGroupId.put(groupActor, groupId);
    }
  }

  private void onTerminated(Terminated t) {
    ActorRef groupActor = t.getActor();
    String groupId = actorToGroupId.get(groupActor);
    log.info("Device group actor for {} has been terminated", groupId);
    actorToGroupId.remove(groupActor);
    groupIdToActor.remove(groupId);
  }

  public Receive createReceive() {
    return receiveBuilder()
            .match(RequestTrackDevice.class, this::onTrackDevice)
            .match(Terminated.class, this::onTerminated)
            .build();
  }

}

We leave tests of the device manager as an exercise as it is very similar to the tests we have written for the group actor.

What is Next?

We have now a hierarchical component for registering and tracking devices and recording measurements. We have seen some conversation patterns like:

  • Request-respond (for temperature recordings).
  • Delegate-respond (for registration of devices).
  • Create-watch-terminate (for creating the group and device actor as children).

In the next chapter, we will introduce group query capabilities, which will establish a new conversation pattern of scatter-gather. In particular, we will implement the functionality that allows users to query the status of all the devices belonging to a group.

The source code for this page can be found here.