Part 5: Querying Device Groups

The conversational patterns that we have seen so far are simple in the sense that they require the actor to keep little or no state. Specifically:

  • Device actors return a reading, which requires no state change
  • Record a temperature, which updates a single field
  • Device Group actors maintain group membership by simply adding or removing entries from a map

In this part, we will use a more complex example. Since homeowners will be interested in the temperatures throughout their home, our goal is to be able to query all of the device actors in a group. Let us start by investigating how such a query API should behave.

Dealing with possible scenarios

The very first issue we face is that the membership of a group is dynamic. Each sensor device is represented by an actor that can stop at any time. At the beginning of the query, we can ask all of the existing device actors for the current temperature. However, during the lifecycle of the query:

  • A device actor might stop and not be able to respond back with a temperature reading.
  • A new device actor might start up and not be included in the query because we weren’t aware of it.

These issues can be addressed in many different ways, but the important point is to settle on the desired behavior. The following works well for our use case:

  • When a query arrives, the group actor takes a snapshot of the existing device actors and will only ask those actors for the temperature.
  • Actors that start up after the query arrives are simply ignored.
  • If an actor in the snapshot stops during the query without answering, we will simply report the fact that it stopped to the sender of the query message.

Apart from device actors coming and going dynamically, some actors might take a long time to answer. For example, they could be stuck in an accidental infinite loop, or fail due to a bug and drop our request. We don’t want the query to continue indefinitely, so we will consider it complete in either of the following cases:

  • All actors in the snapshot have either responded or have confirmed being stopped.
  • We reach a pre-defined deadline.

Given these decisions, along with the fact that a device in the snapshot might have just started and not yet received a temperature to record, we can define four states for each device actor, with respect to a temperature query:

  • It has a temperature available: Temperature(value) Temperature.
  • It has responded, but has no temperature available yet: TemperatureNotAvailable.
  • It has stopped before answering: DeviceNotAvailable.
  • It did not respond before the deadline: DeviceTimedOut.

Summarizing these in message types we can add the following to DeviceGroup:

Scala
final case class RequestAllTemperatures(requestId: Long)
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])

sealed trait TemperatureReading
final case class Temperature(value: Double) extends TemperatureReading
case object TemperatureNotAvailable extends TemperatureReading
case object DeviceNotAvailable extends TemperatureReading
case object DeviceTimedOut extends TemperatureReading
Java
public static final class RequestAllTemperatures {
  final long requestId;

  public RequestAllTemperatures(long requestId) {
    this.requestId = requestId;
  }
}

public static final class RespondAllTemperatures {
  final long requestId;
  final Map<String, TemperatureReading> temperatures;

  public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
    this.requestId = requestId;
    this.temperatures = temperatures;
  }
}

public static interface TemperatureReading {
}

public static final class Temperature implements TemperatureReading {
  public final double value;

  public Temperature(double value) {
    this.value = value;
  }
}

public static final class TemperatureNotAvailable implements TemperatureReading {
}

public static final class DeviceNotAvailable implements TemperatureReading {
}

public static final class DeviceTimedOut implements TemperatureReading {
}

Implementing the query

One approach for implementing the query involves adding code to the group device actor. However, in practice this can be very cumbersome and error prone. Remember that when we start a query, we need to take a snapshot of the devices present and start a timer so that we can enforce the deadline. In the meantime, another query can arrive. For the second query, of course, we need to keep track of the exact same information but in isolation from the previous query. This would require us to maintain separate mappings between queries and device actors.

Instead, we will implement a simpler, and superior approach. We will create an actor that represents a single query and that performs the tasks needed to complete the query on behalf of the group actor. So far we have created actors that belonged to classical domain objects, but now, we will create an actor that represents a process or a task rather than an entity. We benefit by keeping our group device actor simple and being able to better test query capability in isolation.

Defining the query actor

First, we need to design the lifecycle of our query actor. This consists of identifying its initial state, the first action it will take, and the cleanup — if necessary. The query actor will need the following information:

  • The snapshot and IDs of active device actors to query.
  • The ID of the request that started the query (so that we can include it in the reply).
  • The reference of the actor who sent the query. We will send the reply to this actor directly.
  • A deadline that indicates how long the query should wait for replies. Making this a parameter will simplify testing.

Scheduling the query timeout

Since we need a way to indicate how long we are willing to wait for responses, it is time to introduce a new Akka feature that we have not used yet, the built-in scheduler facility. Using the scheduler is simple:

  • We get the scheduler from the ActorSystem, which, in turn, is accessible from the actor’s context: context.system.schedulergetContext().getSystem().scheduler(). This needs an implicit ExecutionContext which is basically the thread-pool that will execute the timer task itself. In our case, we use the same dispatcher as the actor by importing import context.dispatcher passing in getContext().dispatcher().
  • The scheduler.scheduleOnce(time, actorRef, message) scheduler.scheduleOnce(time, actorRef, message, executor, sender) method will schedule the message message into the future by the specified time and send it to the actor actorRef.

We need to create a message that represents the query timeout. We create a simple message CollectionTimeout without any parameters for this purpose. The return value from scheduleOnce is a Cancellable which can be used to cancel the timer if the query finishes successfully in time. At the start of the query, we need to ask each of the device actors for the current temperature. To be able to quickly detect devices that stopped before they got the ReadTemperature message we will also watch each of the actors. This way, we get Terminated messages for those that stop during the lifetime of the query, so we don’t need to wait until the timeout to mark these as not available.

Putting this together, the outline of our DeviceGroupQuery actor looks like this:

Scala
object DeviceGroupQuery {
  case object CollectionTimeout

  def props(
    actorToDeviceId: Map[ActorRef, String],
    requestId:       Long,
    requester:       ActorRef,
    timeout:         FiniteDuration
  ): Props = {
    Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
  }
}

class DeviceGroupQuery(
  actorToDeviceId: Map[ActorRef, String],
  requestId:       Long,
  requester:       ActorRef,
  timeout:         FiniteDuration
) extends Actor with ActorLogging {
  import DeviceGroupQuery._
  import context.dispatcher
  val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)

  override def preStart(): Unit = {
    actorToDeviceId.keysIterator.foreach { deviceActor =>
      context.watch(deviceActor)
      deviceActor ! Device.ReadTemperature(0)
    }
  }

  override def postStop(): Unit = {
    queryTimeoutTimer.cancel()
  }

}
Java
public class DeviceGroupQuery extends AbstractActor {
  public static final class CollectionTimeout {
  }

  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final Map<ActorRef, String> actorToDeviceId;
  final long requestId;
  final ActorRef requester;

  Cancellable queryTimeoutTimer;

  public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
    this.actorToDeviceId = actorToDeviceId;
    this.requestId = requestId;
    this.requester = requester;

    queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
            timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
    );
  }

  public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
    return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
  }

  @Override
  public void preStart() {
    for (ActorRef deviceActor : actorToDeviceId.keySet()) {
      getContext().watch(deviceActor);
      deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
    }
  }

  @Override
  public void postStop() {
    queryTimeoutTimer.cancel();
  }

}

Tracking actor state

The query actor, apart from the pending timer, has one stateful aspect, tracking the set of actors that: have replied, have stopped, or have not replied. One way to track this state is to create a mutable field in the actor (a var). A different approach takes advantage of the ability to change how an actor responds to messages. A Receive is just a function (or an object, if you like) that can be returned from another function. By default, the receive block defines the behavior of the actor, but it is possible to change it multiple times during the life of the actor. We simply call context.become(newBehavior) where newBehavior is anything with type Receive (which is just a shorthand for PartialFunction[Any, Unit]). We will leverage this feature to track the state of our actor.

For our use case:

  1. Instead of defining receive directly, we delegate to a waitingForReplies function to create the Receive.
  2. The waitingForReplies function will keep track of two changing values:
  • a Map of already received replies
  • a Set of actors that we still wait on
  1. We have three events to act on:
  • We can receive a RespondTemperature message from one of the devices.
  • We can receive a Terminated message for a device actor that has been stopped in the meantime.
  • We can reach the deadline and receive a CollectionTimeout.

In the first two cases, we need to keep track of the replies, which we now simply delegate to a method receivedResponse, which we will discuss later. In the case of timeout, we need to simply take all the actors that have not yet replied yet (the members of the set stillWaiting) and put a DeviceTimedOut as the status in the final reply. Then we reply to the submitter of the query with the collected results and stop the query actor.

To accomplish this, add the following to your DeviceGroupQuery source file:

Scala
override def receive: Receive =
  waitingForReplies(
    Map.empty,
    actorToDeviceId.keySet
  )

def waitingForReplies(
  repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
  stillWaiting: Set[ActorRef]
): Receive = {
  case Device.RespondTemperature(0, valueOption) =>
    val deviceActor = sender()
    val reading = valueOption match {
      case Some(value) => DeviceGroup.Temperature(value)
      case None        => DeviceGroup.TemperatureNotAvailable
    }
    receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)

  case Terminated(deviceActor) =>
    receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)

  case CollectionTimeout =>
    val timedOutReplies =
      stillWaiting.map { deviceActor =>
        val deviceId = actorToDeviceId(deviceActor)
        deviceId -> DeviceGroup.DeviceTimedOut
      }
    requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
    context.stop(self)
}
Java
@Override
public Receive createReceive() {
  return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
}

public Receive waitingForReplies(
        Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
        Set<ActorRef> stillWaiting) {
  return receiveBuilder()
          .match(Device.RespondTemperature.class, r -> {
            ActorRef deviceActor = getSender();
            DeviceGroup.TemperatureReading reading = r.value
                    .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
                    .orElse(new DeviceGroup.TemperatureNotAvailable());
            receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
          })
          .match(Terminated.class, t -> {
            receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
          })
          .match(CollectionTimeout.class, t -> {
            Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
            for (ActorRef deviceActor : stillWaiting) {
              String deviceId = actorToDeviceId.get(deviceActor);
              replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
            }
            requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
            getContext().stop(getSelf());
          })
          .build();
}

It is not yet clear how we will “mutate” the answersSoFar and stillWaiting data structures. One important thing to note is that the function waitingForReplies does not handle the messages directly. It returns a Receive function that will handle the messages. This means that if we call waitingForReplies again, with different parameters, then it returns a brand new Receive that will use those new parameters.

We have seen how we can install the initial Receive by simply returning it from receive. In order to install a new one, to record a new reply, for example, we need some mechanism. This mechanism is the method context.become(newReceive) which will change the actor’s message handling function to the provided newReceive function. You can imagine that before starting, your actor automatically calls context.become(receive), i.e. installing the Receive function that is returned from receive. This is another important observation: it is not receive that handles the messages, it just returns a Receive function that will actually handle the messages.

We now have to figure out what to do in receivedResponse. First, we need to record the new result in the map repliesSoFar and remove the actor from stillWaiting. The next step is to check if there are any remaining actors we are waiting for. If there is none, we send the result of the query to the original requester and stop the query actor. Otherwise, we need to update the repliesSoFar and stillWaiting structures and wait for more messages.

In the code before, we treated Terminated as the implicit response DeviceNotAvailable, so receivedResponse does not need to do anything special. However, there is one small task we still need to do. It is possible that we receive a proper response from a device actor, but then it stops during the lifetime of the query. We don’t want this second event to overwrite the already received reply. In other words, we don’t want to receive Terminated after we recorded the response. This is simple to achieve by calling context.unwatch(ref). This method also ensures that we don’t receive Terminated events that are already in the mailbox of the actor. It is also safe to call this multiple times, only the first call will have any effect, the rest is simply ignored.

With all this knowledge, we can create the receivedResponse method:

Scala
def receivedResponse(
  deviceActor:  ActorRef,
  reading:      DeviceGroup.TemperatureReading,
  stillWaiting: Set[ActorRef],
  repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
): Unit = {
  context.unwatch(deviceActor)
  val deviceId = actorToDeviceId(deviceActor)
  val newStillWaiting = stillWaiting - deviceActor

  val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
  if (newStillWaiting.isEmpty) {
    requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
    context.stop(self)
  } else {
    context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
  }
}
Java
public void receivedResponse(ActorRef deviceActor,
                             DeviceGroup.TemperatureReading reading,
                             Set<ActorRef> stillWaiting,
                             Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
  getContext().unwatch(deviceActor);
  String deviceId = actorToDeviceId.get(deviceActor);

  Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
  newStillWaiting.remove(deviceActor);

  Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
  newRepliesSoFar.put(deviceId, reading);
  if (newStillWaiting.isEmpty()) {
    requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
    getContext().stop(getSelf());
  } else {
    getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
  }
}

It is quite natural to ask at this point, what have we gained by using the context.become() trick instead of just making the repliesSoFar and stillWaiting structures mutable fields of the actor (i.e. vars)? In this simple example, not that much. The value of this style of state keeping becomes more evident when you suddenly have more kinds of states. Since each state might have temporary data that is relevant itself, keeping these as fields would pollute the global state of the actor, i.e. it is unclear what fields are used in what state. Using parameterized Receive “factory” methods we can keep data private that is only relevant to the state. It is still a good exercise to rewrite the query using vars mutable fields instead of context.become(). However, it is recommended to get comfortable with the solution we have used here as it helps structuring more complex actor code in a cleaner and more maintainable way.

Our query actor is now done:

Scala
object DeviceGroupQuery {
  case object CollectionTimeout

  def props(
    actorToDeviceId: Map[ActorRef, String],
    requestId:       Long,
    requester:       ActorRef,
    timeout:         FiniteDuration
  ): Props = {
    Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
  }
}

class DeviceGroupQuery(
  actorToDeviceId: Map[ActorRef, String],
  requestId:       Long,
  requester:       ActorRef,
  timeout:         FiniteDuration
) extends Actor with ActorLogging {
  import DeviceGroupQuery._
  import context.dispatcher
  val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)

  override def preStart(): Unit = {
    actorToDeviceId.keysIterator.foreach { deviceActor =>
      context.watch(deviceActor)
      deviceActor ! Device.ReadTemperature(0)
    }
  }

  override def postStop(): Unit = {
    queryTimeoutTimer.cancel()
  }

  override def receive: Receive =
    waitingForReplies(
      Map.empty,
      actorToDeviceId.keySet
    )

  def waitingForReplies(
    repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
    stillWaiting: Set[ActorRef]
  ): Receive = {
    case Device.RespondTemperature(0, valueOption) =>
      val deviceActor = sender()
      val reading = valueOption match {
        case Some(value) => DeviceGroup.Temperature(value)
        case None        => DeviceGroup.TemperatureNotAvailable
      }
      receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)

    case Terminated(deviceActor) =>
      receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)

    case CollectionTimeout =>
      val timedOutReplies =
        stillWaiting.map { deviceActor =>
          val deviceId = actorToDeviceId(deviceActor)
          deviceId -> DeviceGroup.DeviceTimedOut
        }
      requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
      context.stop(self)
  }

  def receivedResponse(
    deviceActor:  ActorRef,
    reading:      DeviceGroup.TemperatureReading,
    stillWaiting: Set[ActorRef],
    repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
  ): Unit = {
    context.unwatch(deviceActor)
    val deviceId = actorToDeviceId(deviceActor)
    val newStillWaiting = stillWaiting - deviceActor

    val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
    if (newStillWaiting.isEmpty) {
      requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
      context.stop(self)
    } else {
      context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
    }
  }

}
Java
public class DeviceGroupQuery extends AbstractActor {
  public static final class CollectionTimeout {
  }

  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final Map<ActorRef, String> actorToDeviceId;
  final long requestId;
  final ActorRef requester;

  Cancellable queryTimeoutTimer;

  public DeviceGroupQuery(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
    this.actorToDeviceId = actorToDeviceId;
    this.requestId = requestId;
    this.requester = requester;

    queryTimeoutTimer = getContext().getSystem().scheduler().scheduleOnce(
            timeout, getSelf(), new CollectionTimeout(), getContext().dispatcher(), getSelf()
    );
  }

  public static Props props(Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) {
    return Props.create(DeviceGroupQuery.class, actorToDeviceId, requestId, requester, timeout);
  }

  @Override
  public void preStart() {
    for (ActorRef deviceActor : actorToDeviceId.keySet()) {
      getContext().watch(deviceActor);
      deviceActor.tell(new Device.ReadTemperature(0L), getSelf());
    }
  }

  @Override
  public void postStop() {
    queryTimeoutTimer.cancel();
  }

  @Override
  public Receive createReceive() {
    return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet());
  }

  public Receive waitingForReplies(
          Map<String, DeviceGroup.TemperatureReading> repliesSoFar,
          Set<ActorRef> stillWaiting) {
    return receiveBuilder()
            .match(Device.RespondTemperature.class, r -> {
              ActorRef deviceActor = getSender();
              DeviceGroup.TemperatureReading reading = r.value
                      .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v))
                      .orElse(new DeviceGroup.TemperatureNotAvailable());
              receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar);
            })
            .match(Terminated.class, t -> {
              receivedResponse(t.getActor(), new DeviceGroup.DeviceNotAvailable(), stillWaiting, repliesSoFar);
            })
            .match(CollectionTimeout.class, t -> {
              Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar);
              for (ActorRef deviceActor : stillWaiting) {
                String deviceId = actorToDeviceId.get(deviceActor);
                replies.put(deviceId, new DeviceGroup.DeviceTimedOut());
              }
              requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf());
              getContext().stop(getSelf());
            })
            .build();
  }

  public void receivedResponse(ActorRef deviceActor,
                               DeviceGroup.TemperatureReading reading,
                               Set<ActorRef> stillWaiting,
                               Map<String, DeviceGroup.TemperatureReading> repliesSoFar) {
    getContext().unwatch(deviceActor);
    String deviceId = actorToDeviceId.get(deviceActor);

    Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting);
    newStillWaiting.remove(deviceActor);

    Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar);
    newRepliesSoFar.put(deviceId, reading);
    if (newStillWaiting.isEmpty()) {
      requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf());
      getContext().stop(getSelf());
    } else {
      getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting));
    }
  }
}

Testing the query actor

Now let’s verify the correctness of the query actor implementation. There are various scenarios we need to test individually to make sure everything works as expected. To be able to do this, we need to simulate the device actors somehow to exercise various normal or failure scenarios. Thankfully we took the list of collaborators (actually a Map) as a parameter to the query actor, so we can easily pass in TestProbe TestKit references. In our first test, we try out the case when there are two devices and both report a temperature:

Scala
"return temperature value for working devices" in {
  val requester = TestProbe()

  val device1 = TestProbe()
  val device2 = TestProbe()

  val queryActor = system.actorOf(DeviceGroupQuery.props(
    actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
    requestId = 1,
    requester = requester.ref,
    timeout = 3.seconds
  ))

  device1.expectMsg(Device.ReadTemperature(requestId = 0))
  device2.expectMsg(Device.ReadTemperature(requestId = 0))

  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)

  requester.expectMsg(DeviceGroup.RespondAllTemperatures(
    requestId = 1,
    temperatures = Map(
      "device1" -> DeviceGroup.Temperature(1.0),
      "device2" -> DeviceGroup.Temperature(2.0)
    )
  ))
}
Java
@Test
public void testReturnTemperatureValueForWorkingDevices() {
  TestKit requester = new TestKit(system);

  TestKit device1 = new TestKit(system);
  TestKit device2 = new TestKit(system);

  Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  actorToDeviceId.put(device1.getRef(), "device1");
  actorToDeviceId.put(device2.getRef(), "device2");

  ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
          actorToDeviceId,
          1L,
          requester.getRef(),
          new FiniteDuration(3, TimeUnit.SECONDS)));

  assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
  assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);

  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());

  DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
  assertEquals(1L, response.requestId);

  Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
  expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
  expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));

  assertEqualTemperatures(expectedTemperatures, response.temperatures);
}

That was the happy case, but we know that sometimes devices cannot provide a temperature measurement. This scenario is just slightly different from the previous:

Scala
"return TemperatureNotAvailable for devices with no readings" in {
  val requester = TestProbe()

  val device1 = TestProbe()
  val device2 = TestProbe()

  val queryActor = system.actorOf(DeviceGroupQuery.props(
    actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
    requestId = 1,
    requester = requester.ref,
    timeout = 3.seconds
  ))

  device1.expectMsg(Device.ReadTemperature(requestId = 0))
  device2.expectMsg(Device.ReadTemperature(requestId = 0))

  queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)

  requester.expectMsg(DeviceGroup.RespondAllTemperatures(
    requestId = 1,
    temperatures = Map(
      "device1" -> DeviceGroup.TemperatureNotAvailable,
      "device2" -> DeviceGroup.Temperature(2.0)
    )
  ))
}
Java
@Test
public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() {
  TestKit requester = new TestKit(system);

  TestKit device1 = new TestKit(system);
  TestKit device2 = new TestKit(system);

  Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  actorToDeviceId.put(device1.getRef(), "device1");
  actorToDeviceId.put(device2.getRef(), "device2");

  ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
          actorToDeviceId,
          1L,
          requester.getRef(),
          new FiniteDuration(3, TimeUnit.SECONDS)));

  assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
  assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);

  queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef());
  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());

  DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
  assertEquals(1L, response.requestId);

  Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
  expectedTemperatures.put("device1", new DeviceGroup.TemperatureNotAvailable());
  expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));

  assertEqualTemperatures(expectedTemperatures, response.temperatures);
}

We also know, that sometimes device actors stop before answering:

Scala
"return DeviceNotAvailable if device stops before answering" in {
  val requester = TestProbe()

  val device1 = TestProbe()
  val device2 = TestProbe()

  val queryActor = system.actorOf(DeviceGroupQuery.props(
    actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
    requestId = 1,
    requester = requester.ref,
    timeout = 3.seconds
  ))

  device1.expectMsg(Device.ReadTemperature(requestId = 0))
  device2.expectMsg(Device.ReadTemperature(requestId = 0))

  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
  device2.ref ! PoisonPill

  requester.expectMsg(DeviceGroup.RespondAllTemperatures(
    requestId = 1,
    temperatures = Map(
      "device1" -> DeviceGroup.Temperature(1.0),
      "device2" -> DeviceGroup.DeviceNotAvailable
    )
  ))
}
Java
@Test
public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() {
  TestKit requester = new TestKit(system);

  TestKit device1 = new TestKit(system);
  TestKit device2 = new TestKit(system);

  Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  actorToDeviceId.put(device1.getRef(), "device1");
  actorToDeviceId.put(device2.getRef(), "device2");

  ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
          actorToDeviceId,
          1L,
          requester.getRef(),
          new FiniteDuration(3, TimeUnit.SECONDS)));

  assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
  assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);

  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
  device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());

  DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
  assertEquals(1L, response.requestId);

  Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
  expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
  expectedTemperatures.put("device2", new DeviceGroup.DeviceNotAvailable());

  assertEqualTemperatures(expectedTemperatures, response.temperatures);
}

If you remember, there is another case related to device actors stopping. It is possible that we get a normal reply from a device actor, but then receive a Terminated for the same actor later. In this case, we would like to keep the first reply and not mark the device as DeviceNotAvailable. We should test this, too:

Scala
"return temperature reading even if device stops after answering" in {
  val requester = TestProbe()

  val device1 = TestProbe()
  val device2 = TestProbe()

  val queryActor = system.actorOf(DeviceGroupQuery.props(
    actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
    requestId = 1,
    requester = requester.ref,
    timeout = 3.seconds
  ))

  device1.expectMsg(Device.ReadTemperature(requestId = 0))
  device2.expectMsg(Device.ReadTemperature(requestId = 0))

  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
  device2.ref ! PoisonPill

  requester.expectMsg(DeviceGroup.RespondAllTemperatures(
    requestId = 1,
    temperatures = Map(
      "device1" -> DeviceGroup.Temperature(1.0),
      "device2" -> DeviceGroup.Temperature(2.0)
    )
  ))
}
Java
@Test
public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() {
  TestKit requester = new TestKit(system);

  TestKit device1 = new TestKit(system);
  TestKit device2 = new TestKit(system);

  Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  actorToDeviceId.put(device1.getRef(), "device1");
  actorToDeviceId.put(device2.getRef(), "device2");

  ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
          actorToDeviceId,
          1L,
          requester.getRef(),
          new FiniteDuration(3, TimeUnit.SECONDS)));

  assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
  assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);

  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());
  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef());
  device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender());

  DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
  assertEquals(1L, response.requestId);

  Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
  expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
  expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));

  assertEqualTemperatures(expectedTemperatures, response.temperatures);
}

The final case is when not all devices respond in time. To keep our test relatively fast, we will construct the DeviceGroupQuery actor with a smaller timeout:

Scala
"return DeviceTimedOut if device does not answer in time" in {
  val requester = TestProbe()

  val device1 = TestProbe()
  val device2 = TestProbe()

  val queryActor = system.actorOf(DeviceGroupQuery.props(
    actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
    requestId = 1,
    requester = requester.ref,
    timeout = 1.second
  ))

  device1.expectMsg(Device.ReadTemperature(requestId = 0))
  device2.expectMsg(Device.ReadTemperature(requestId = 0))

  queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)

  requester.expectMsg(DeviceGroup.RespondAllTemperatures(
    requestId = 1,
    temperatures = Map(
      "device1" -> DeviceGroup.Temperature(1.0),
      "device2" -> DeviceGroup.DeviceTimedOut
    )
  ))
}
Java
@Test
public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() {
  TestKit requester = new TestKit(system);

  TestKit device1 = new TestKit(system);
  TestKit device2 = new TestKit(system);

  Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  actorToDeviceId.put(device1.getRef(), "device1");
  actorToDeviceId.put(device2.getRef(), "device2");

  ActorRef queryActor = system.actorOf(DeviceGroupQuery.props(
          actorToDeviceId,
          1L,
          requester.getRef(),
          new FiniteDuration(3, TimeUnit.SECONDS)));

  assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId);
  assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId);

  queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef());

  DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(
          FiniteDuration.create(5, TimeUnit.SECONDS),
          DeviceGroup.RespondAllTemperatures.class);
  assertEquals(1L, response.requestId);

  Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
  expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
  expectedTemperatures.put("device2", new DeviceGroup.DeviceTimedOut());

  assertEqualTemperatures(expectedTemperatures, response.temperatures);
}

Our query works as expected now, it is time to include this new functionality in the DeviceGroup actor now.

Adding query capability to the group

Including the query feature in the group actor is fairly simple now. We did all the heavy lifting in the query actor itself, the group actor only needs to create it with the right initial parameters and nothing else.

Scala
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
  var deviceIdToActor = Map.empty[String, ActorRef]
  var actorToDeviceId = Map.empty[ActorRef, String]
  var nextCollectionId = 0L

  override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)

  override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)

  override def receive: Receive = {
    // ... other cases omitted

    case RequestAllTemperatures(requestId) =>
      context.actorOf(DeviceGroupQuery.props(
        actorToDeviceId = actorToDeviceId,
        requestId = requestId,
        requester = sender(),
        3.seconds
      ))
  }

}
Java
public class DeviceGroup extends AbstractActor {
  private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

  final String groupId;

  public DeviceGroup(String groupId) {
    this.groupId = groupId;
  }

  public static Props props(String groupId) {
    return Props.create(DeviceGroup.class, groupId);
  }

  public static final class RequestDeviceList {
    final long requestId;

    public RequestDeviceList(long requestId) {
      this.requestId = requestId;
    }
  }

  public static final class ReplyDeviceList {
    final long requestId;
    final Set<String> ids;

    public ReplyDeviceList(long requestId, Set<String> ids) {
      this.requestId = requestId;
      this.ids = ids;
    }
  }

  public static final class RequestAllTemperatures {
    final long requestId;

    public RequestAllTemperatures(long requestId) {
      this.requestId = requestId;
    }
  }

  public static final class RespondAllTemperatures {
    final long requestId;
    final Map<String, TemperatureReading> temperatures;

    public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) {
      this.requestId = requestId;
      this.temperatures = temperatures;
    }
  }

  public static interface TemperatureReading {
  }

  public static final class Temperature implements TemperatureReading {
    public final double value;

    public Temperature(double value) {
      this.value = value;
    }
  }

  public static final class TemperatureNotAvailable implements TemperatureReading {
  }

  public static final class DeviceNotAvailable implements TemperatureReading {
  }

  public static final class DeviceTimedOut implements TemperatureReading {
  }


  final Map<String, ActorRef> deviceIdToActor = new HashMap<>();
  final Map<ActorRef, String> actorToDeviceId = new HashMap<>();
  final long nextCollectionId = 0L;

  @Override
  public void preStart() {
    log.info("DeviceGroup {} started", groupId);
  }

  @Override
  public void postStop() {
    log.info("DeviceGroup {} stopped", groupId);
  }


  private void onAllTemperatures(RequestAllTemperatures r) {
    getContext().actorOf(DeviceGroupQuery.props(
            actorToDeviceId, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS)));
  }

  @Override
  public Receive createReceive() {
            // ... other cases omitted
            .match(RequestAllTemperatures.class, this::onAllTemperatures)
            .build();
  }
}

It is probably worth restating what we said at the beginning of the chapter. By keeping the temporary state that is only relevant to the query itself in a separate actor we keep the group actor implementation very simple. It delegates everything to child actors and therefore does not have to keep state that is not relevant to its core business. Also, multiple queries can now run parallel to each other, in fact, as many as needed. In our case querying an individual device actor is a fast operation, but if this were not the case, for example, because the remote sensors need to be contacted over the network, this design would significantly improve throughput.

We close this chapter by testing that everything works together. This test is just a variant of the previous ones, now exercising the group query feature:

Scala
"be able to collect temperatures from all active devices" in {
  val probe = TestProbe()
  val groupActor = system.actorOf(DeviceGroup.props("group"))

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor1 = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor2 = probe.lastSender

  groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
  probe.expectMsg(DeviceManager.DeviceRegistered)
  val deviceActor3 = probe.lastSender

  // Check that the device actors are working
  deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
  deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
  probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
  // No temperature for device3

  groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
  probe.expectMsg(
    DeviceGroup.RespondAllTemperatures(
      requestId = 0,
      temperatures = Map(
        "device1" -> DeviceGroup.Temperature(1.0),
        "device2" -> DeviceGroup.Temperature(2.0),
        "device3" -> DeviceGroup.TemperatureNotAvailable)))
}
Java
@Test
public void testCollectTemperaturesFromAllActiveDevices() {
  TestKit probe = new TestKit(system);
  ActorRef groupActor = system.actorOf(DeviceGroup.props("group"));

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor1 = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor2 = probe.getLastSender();

  groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef());
  probe.expectMsgClass(DeviceManager.DeviceRegistered.class);
  ActorRef deviceActor3 = probe.getLastSender();

  // Check that the device actors are working
  deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef());
  assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
  deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef());
  assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId);
  // No temperature for device 3

  groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef());
  DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class);
  assertEquals(0L, response.requestId);

  Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>();
  expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0));
  expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0));
  expectedTemperatures.put("device3", new DeviceGroup.TemperatureNotAvailable());

  assertEqualTemperatures(expectedTemperatures, response.temperatures);
}

Summary

In the context of the IoT system, this guide introduced the following concepts, among others. You can follow the links to review them if necessary:

What’s Next?

To continue your journey with Akka, we recommend:

The source code for this page can be found here.