Part 5: Querying Device Groups
Dependency
Add the following dependency in your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-actor_2.12:2.5.32" }
Introduction
The conversational patterns that we have seen so far are simple in the sense that they require the actor to keep little or no state. Specifically:
- Device actors return a reading, which requires no state change
- Record a temperature, which updates a single field
- Device Group actors maintain group membership by adding or removing entries from a map
In this part, we will use a more complex example. Since homeowners will be interested in the temperatures throughout their home, our goal is to be able to query all of the device actors in a group. Let us start by investigating how such a query API should behave.
Dealing with possible scenarios
The very first issue we face is that the membership of a group is dynamic. Each sensor device is represented by an actor that can stop at any time. At the beginning of the query, we can ask all of the existing device actors for the current temperature. However, during the lifecycle of the query:
- A device actor might stop and not be able to respond back with a temperature reading.
- A new device actor might start up and not be included in the query because we weren’t aware of it.
These issues can be addressed in many different ways, but the important point is to settle on the desired behavior. The following works well for our use case:
- When a query arrives, the group actor takes a snapshot of the existing device actors and will only ask those actors for the temperature.
- Actors that start up after the query arrives are ignored.
- If an actor in the snapshot stops during the query without answering, we will report the fact that it stopped to the sender of the query message.
Apart from device actors coming and going dynamically, some actors might take a long time to answer. For example, they could be stuck in an accidental infinite loop, or fail due to a bug and drop our request. We don’t want the query to continue indefinitely, so we will consider it complete in either of the following cases:
- All actors in the snapshot have either responded or have confirmed being stopped.
- We reach a pre-defined deadline.
Given these decisions, along with the fact that a device in the snapshot might have just started and not yet received a temperature to record, we can define four states for each device actor, with respect to a temperature query:
- It has a temperature available:
Temperature(value)
Temperature
. - It has responded, but has no temperature available yet:
TemperatureNotAvailable
. - It has stopped before answering:
DeviceNotAvailable
. - It did not respond before the deadline:
DeviceTimedOut
.
Summarizing these in message types we can add the following to DeviceGroup
:
- Scala
-
source
final case class RequestAllTemperatures(requestId: Long) final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) sealed trait TemperatureReading final case class Temperature(value: Double) extends TemperatureReading case object TemperatureNotAvailable extends TemperatureReading case object DeviceNotAvailable extends TemperatureReading case object DeviceTimedOut extends TemperatureReading
- Java
-
source
public static final class RequestAllTemperatures { final long requestId; public RequestAllTemperatures(long requestId) { this.requestId = requestId; } } public static final class RespondAllTemperatures { final long requestId; final Map<String, TemperatureReading> temperatures; public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) { this.requestId = requestId; this.temperatures = temperatures; } } public static interface TemperatureReading {} public static final class Temperature implements TemperatureReading { public final double value; public Temperature(double value) { this.value = value; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Temperature that = (Temperature) o; return Double.compare(that.value, value) == 0; } @Override public int hashCode() { long temp = Double.doubleToLongBits(value); return (int) (temp ^ (temp >>> 32)); } @Override public String toString() { return "Temperature{" + "value=" + value + '}'; } } public enum TemperatureNotAvailable implements TemperatureReading { INSTANCE } public enum DeviceNotAvailable implements TemperatureReading { INSTANCE } public enum DeviceTimedOut implements TemperatureReading { INSTANCE }
Implementing the query
One approach for implementing the query involves adding code to the group device actor. However, in practice this can be very cumbersome and error prone. Remember that when we start a query, we need to take a snapshot of the devices present and start a timer so that we can enforce the deadline. In the meantime, another query can arrive. For the second query we need to keep track of the exact same information but in isolation from the previous query. This would require us to maintain separate mappings between queries and device actors.
Instead, we will implement a simpler, and superior approach. We will create an actor that represents a single query and that performs the tasks needed to complete the query on behalf of the group actor. So far we have created actors that belonged to classical domain objects, but now, we will create an actor that represents a process or a task rather than an entity. We benefit by keeping our group device actor simple and being able to better test query capability in isolation.
Defining the query actor
First, we need to design the lifecycle of our query actor. This consists of identifying its initial state, the first action it will take, and the cleanup — if necessary. The query actor will need the following information:
- The snapshot and IDs of active device actors to query.
- The ID of the request that started the query (so that we can include it in the reply).
- The reference of the actor who sent the query. We will send the reply to this actor directly.
- A deadline that indicates how long the query should wait for replies. Making this a parameter will simplify testing.
Scheduling the query timeout
Since we need a way to indicate how long we are willing to wait for responses, it is time to introduce a new Akka feature that we have not used yet, the built-in scheduler facility. Using the scheduler is simple:
- We get the scheduler from the
ActorSystem
, which, in turn, is accessible from the actor’s context:context.system.scheduler
getContext().getSystem().scheduler()
. This needs an implicitExecutionContext
which is the thread-pool that will execute the timer task itself. In our case, we use the same dispatcher as the actor by importingimport context.dispatcher
passing ingetContext().getDispatcher()
. - The
scheduler.scheduleOnce(time, actorRef, message)
scheduler.scheduleOnce(time, actorRef, message, executor, sender)
method will schedule the messagemessage
into the future by the specifiedtime
and send it to the actoractorRef
.
We need to create a message that represents the query timeout. We create a simple message CollectionTimeout
without any parameters for this purpose. The return value from scheduleOnce
is a Cancellable
which can be used to cancel the timer if the query finishes successfully in time. At the start of the query, we need to ask each of the device actors for the current temperature. To be able to quickly detect devices that stopped before they got the ReadTemperature
message we will also watch each of the actors. This way, we get Terminated
messages for those that stop during the lifetime of the query, so we don’t need to wait until the timeout to mark these as not available.
Putting this together, the outline of our DeviceGroupQuery
actor looks like this:
- Scala
-
source
object DeviceGroupQuery { case object CollectionTimeout def props( actorToDeviceId: Map[ActorRef, String], requestId: Long, requester: ActorRef, timeout: FiniteDuration): Props = { Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) } } class DeviceGroupQuery( actorToDeviceId: Map[ActorRef, String], requestId: Long, requester: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { import DeviceGroupQuery._ import context.dispatcher val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) override def preStart(): Unit = { actorToDeviceId.keysIterator.foreach { deviceActor => context.watch(deviceActor) deviceActor ! Device.ReadTemperature(0) } } override def postStop(): Unit = { queryTimeoutTimer.cancel() } }
- Java
-
source
public class DeviceGroupQuery extends AbstractActor { public static final class CollectionTimeout {} private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final Map<ActorRef, String> actorToDeviceId; final long requestId; final ActorRef requester; Cancellable queryTimeoutTimer; public DeviceGroupQuery( Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { this.actorToDeviceId = actorToDeviceId; this.requestId = requestId; this.requester = requester; queryTimeoutTimer = getContext() .getSystem() .scheduler() .scheduleOnce( timeout, getSelf(), new CollectionTimeout(), getContext().getDispatcher(), getSelf()); } public static Props props( Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { return Props.create( DeviceGroupQuery.class, () -> new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)); } @Override public void preStart() { for (ActorRef deviceActor : actorToDeviceId.keySet()) { getContext().watch(deviceActor); deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); } } @Override public void postStop() { queryTimeoutTimer.cancel(); } }
Tracking actor state
The query actor, apart from the pending timer, has one stateful aspect, tracking the set of actors that: have replied, have stopped, or have not replied. One way to track this state is to create a mutable field in the actor (a var
). A different approach takes advantage of the ability to change how an actor responds to messages. A Receive
is just a function (or an object, if you like) that can be returned from another function. By default, the receive
block defines the behavior of the actor, but it is possible to change it multiple times during the life of the actor. We call context.become(newBehavior)
where newBehavior
is anything with type Receive
(which is a shorthand for PartialFunction[Any, Unit]
). We will leverage this feature to track the state of our actor.
For our use case:
- Instead of defining
receive
directly, we delegate to awaitingForReplies
function to create theReceive
. - The
waitingForReplies
function will keep track of two changing values:
- a
Map
of already received replies - a
Set
of actors that we still wait on
- We have three events to act on:
- We can receive a
RespondTemperature
message from one of the devices. - We can receive a
Terminated
message for a device actor that has been stopped in the meantime. - We can reach the deadline and receive a
CollectionTimeout
.
In the first two cases, we need to keep track of the replies, which we now delegate to a method receivedResponse
, which we will discuss later. In the case of timeout, we need to simply take all the actors that have not yet replied (the members of the set stillWaiting
) and put a DeviceTimedOut
as the status in the final reply. Then we reply to the submitter of the query with the collected results and stop the query actor.
To accomplish this, add the following to your DeviceGroupQuery
source file:
- Scala
-
source
override def receive: Receive = waitingForReplies(Map.empty, actorToDeviceId.keySet) def waitingForReplies( repliesSoFar: Map[String, DeviceGroup.TemperatureReading], stillWaiting: Set[ActorRef]): Receive = { case Device.RespondTemperature(0, valueOption) => val deviceActor = sender() val reading = valueOption match { case Some(value) => DeviceGroup.Temperature(value) case None => DeviceGroup.TemperatureNotAvailable } receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) case Terminated(deviceActor) => receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) case CollectionTimeout => val timedOutReplies = stillWaiting.map { deviceActor => val deviceId = actorToDeviceId(deviceActor) deviceId -> DeviceGroup.DeviceTimedOut } requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) context.stop(self) }
- Java
-
source
@Override public Receive createReceive() { return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); } public Receive waitingForReplies( Map<String, DeviceGroup.TemperatureReading> repliesSoFar, Set<ActorRef> stillWaiting) { return receiveBuilder() .match( Device.RespondTemperature.class, r -> { ActorRef deviceActor = getSender(); DeviceGroup.TemperatureReading reading = r.value .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) .orElse(DeviceGroup.TemperatureNotAvailable.INSTANCE); receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); }) .match( Terminated.class, t -> { receivedResponse( t.getActor(), DeviceGroup.DeviceNotAvailable.INSTANCE, stillWaiting, repliesSoFar); }) .match( CollectionTimeout.class, t -> { Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar); for (ActorRef deviceActor : stillWaiting) { String deviceId = actorToDeviceId.get(deviceActor); replies.put(deviceId, DeviceGroup.DeviceTimedOut.INSTANCE); } requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); getContext().stop(getSelf()); }) .build(); }
It is not yet clear how we will “mutate” the repliesSoFar
and stillWaiting
data structures. One important thing to note is that the function waitingForReplies
does not handle the messages directly. It returns a Receive
function that will handle the messages. This means that if we call waitingForReplies
again, with different parameters, then it returns a brand new Receive
that will use those new parameters.
We have seen how we can install the initial Receive
by returning it from receive
. In order to install a new one, to record a new reply, for example, we need some mechanism. This mechanism is the method context.become(newReceive)
which will change the actor’s message handling function to the provided newReceive
function. You can imagine that before starting, your actor automatically calls context.become(receive)
, i.e. installing the Receive
function that is returned from receive
. This is another important observation: it is not receive
that handles the messages, it returns a Receive
function that will actually handle the messages.
We now have to figure out what to do in receivedResponse
. First, we need to record the new result in the map repliesSoFar
and remove the actor from stillWaiting
. The next step is to check if there are any remaining actors we are waiting for. If there is none, we send the result of the query to the original requester and stop the query actor. Otherwise, we need to update the repliesSoFar
and stillWaiting
structures and wait for more messages.
In the code before, we treated Terminated
as the implicit response DeviceNotAvailable
, so receivedResponse
does not need to do anything special. However, there is one small task we still need to do. It is possible that we receive a proper response from a device actor, but then it stops during the lifetime of the query. We don’t want this second event to overwrite the already received reply. In other words, we don’t want to receive Terminated
after we recorded the response. This is simple to achieve by calling context.unwatch(ref)
. This method also ensures that we don’t receive Terminated
events that are already in the mailbox of the actor. It is also safe to call this multiple times, only the first call will have any effect, the rest is ignored.
With all this knowledge, we can create the receivedResponse
method:
- Scala
-
source
def receivedResponse( deviceActor: ActorRef, reading: DeviceGroup.TemperatureReading, stillWaiting: Set[ActorRef], repliesSoFar: Map[String, DeviceGroup.TemperatureReading]): Unit = { context.unwatch(deviceActor) val deviceId = actorToDeviceId(deviceActor) val newStillWaiting = stillWaiting - deviceActor val newRepliesSoFar = repliesSoFar + (deviceId -> reading) if (newStillWaiting.isEmpty) { requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) context.stop(self) } else { context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) } }
- Java
-
source
public void receivedResponse( ActorRef deviceActor, DeviceGroup.TemperatureReading reading, Set<ActorRef> stillWaiting, Map<String, DeviceGroup.TemperatureReading> repliesSoFar) { getContext().unwatch(deviceActor); String deviceId = actorToDeviceId.get(deviceActor); Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting); newStillWaiting.remove(deviceActor); Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar); newRepliesSoFar.put(deviceId, reading); if (newStillWaiting.isEmpty()) { requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); getContext().stop(getSelf()); } else { getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); } }
It is quite natural to ask at this point, what have we gained by using the context.become()
trick instead of making the repliesSoFar
and stillWaiting
structures mutable fields of the actor (i.e. var
s)? In this simple example, not that much. The value of this style of state keeping becomes more evident when you suddenly have more kinds of states. Since each state might have temporary data that is relevant itself, keeping these as fields would pollute the global state of the actor, i.e. it is unclear what fields are used in what state. Using parameterized Receive
“factory” methods we can keep data private that is only relevant to the state. It is still a good exercise to rewrite the query using var
s mutable fields instead of context.become()
. However, it is recommended to get comfortable with the solution we have used here as it helps structuring more complex actor code in a cleaner and more maintainable way.
Our query actor is now done:
- Scala
-
source
object DeviceGroupQuery { case object CollectionTimeout def props( actorToDeviceId: Map[ActorRef, String], requestId: Long, requester: ActorRef, timeout: FiniteDuration): Props = { Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) } } class DeviceGroupQuery( actorToDeviceId: Map[ActorRef, String], requestId: Long, requester: ActorRef, timeout: FiniteDuration) extends Actor with ActorLogging { import DeviceGroupQuery._ import context.dispatcher val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) override def preStart(): Unit = { actorToDeviceId.keysIterator.foreach { deviceActor => context.watch(deviceActor) deviceActor ! Device.ReadTemperature(0) } } override def postStop(): Unit = { queryTimeoutTimer.cancel() } override def receive: Receive = waitingForReplies(Map.empty, actorToDeviceId.keySet) def waitingForReplies( repliesSoFar: Map[String, DeviceGroup.TemperatureReading], stillWaiting: Set[ActorRef]): Receive = { case Device.RespondTemperature(0, valueOption) => val deviceActor = sender() val reading = valueOption match { case Some(value) => DeviceGroup.Temperature(value) case None => DeviceGroup.TemperatureNotAvailable } receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) case Terminated(deviceActor) => receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) case CollectionTimeout => val timedOutReplies = stillWaiting.map { deviceActor => val deviceId = actorToDeviceId(deviceActor) deviceId -> DeviceGroup.DeviceTimedOut } requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) context.stop(self) } def receivedResponse( deviceActor: ActorRef, reading: DeviceGroup.TemperatureReading, stillWaiting: Set[ActorRef], repliesSoFar: Map[String, DeviceGroup.TemperatureReading]): Unit = { context.unwatch(deviceActor) val deviceId = actorToDeviceId(deviceActor) val newStillWaiting = stillWaiting - deviceActor val newRepliesSoFar = repliesSoFar + (deviceId -> reading) if (newStillWaiting.isEmpty) { requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) context.stop(self) } else { context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) } } }
- Java
-
source
public class DeviceGroupQuery extends AbstractActor { public static final class CollectionTimeout {} private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final Map<ActorRef, String> actorToDeviceId; final long requestId; final ActorRef requester; Cancellable queryTimeoutTimer; public DeviceGroupQuery( Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { this.actorToDeviceId = actorToDeviceId; this.requestId = requestId; this.requester = requester; queryTimeoutTimer = getContext() .getSystem() .scheduler() .scheduleOnce( timeout, getSelf(), new CollectionTimeout(), getContext().getDispatcher(), getSelf()); } public static Props props( Map<ActorRef, String> actorToDeviceId, long requestId, ActorRef requester, FiniteDuration timeout) { return Props.create( DeviceGroupQuery.class, () -> new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)); } @Override public void preStart() { for (ActorRef deviceActor : actorToDeviceId.keySet()) { getContext().watch(deviceActor); deviceActor.tell(new Device.ReadTemperature(0L), getSelf()); } } @Override public void postStop() { queryTimeoutTimer.cancel(); } @Override public Receive createReceive() { return waitingForReplies(new HashMap<>(), actorToDeviceId.keySet()); } public Receive waitingForReplies( Map<String, DeviceGroup.TemperatureReading> repliesSoFar, Set<ActorRef> stillWaiting) { return receiveBuilder() .match( Device.RespondTemperature.class, r -> { ActorRef deviceActor = getSender(); DeviceGroup.TemperatureReading reading = r.value .map(v -> (DeviceGroup.TemperatureReading) new DeviceGroup.Temperature(v)) .orElse(DeviceGroup.TemperatureNotAvailable.INSTANCE); receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar); }) .match( Terminated.class, t -> { receivedResponse( t.getActor(), DeviceGroup.DeviceNotAvailable.INSTANCE, stillWaiting, repliesSoFar); }) .match( CollectionTimeout.class, t -> { Map<String, DeviceGroup.TemperatureReading> replies = new HashMap<>(repliesSoFar); for (ActorRef deviceActor : stillWaiting) { String deviceId = actorToDeviceId.get(deviceActor); replies.put(deviceId, DeviceGroup.DeviceTimedOut.INSTANCE); } requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, replies), getSelf()); getContext().stop(getSelf()); }) .build(); } public void receivedResponse( ActorRef deviceActor, DeviceGroup.TemperatureReading reading, Set<ActorRef> stillWaiting, Map<String, DeviceGroup.TemperatureReading> repliesSoFar) { getContext().unwatch(deviceActor); String deviceId = actorToDeviceId.get(deviceActor); Set<ActorRef> newStillWaiting = new HashSet<>(stillWaiting); newStillWaiting.remove(deviceActor); Map<String, DeviceGroup.TemperatureReading> newRepliesSoFar = new HashMap<>(repliesSoFar); newRepliesSoFar.put(deviceId, reading); if (newStillWaiting.isEmpty()) { requester.tell(new DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar), getSelf()); getContext().stop(getSelf()); } else { getContext().become(waitingForReplies(newRepliesSoFar, newStillWaiting)); } } }
Testing the query actor
Now let’s verify the correctness of the query actor implementation. There are various scenarios we need to test individually to make sure everything works as expected. To be able to do this, we need to simulate the device actors somehow to exercise various normal or failure scenarios. Thankfully we took the list of collaborators (actually a Map
) as a parameter to the query actor, so we can pass in TestProbe
TestKit
references. In our first test, we try out the case when there are two devices and both report a temperature:
- Scala
-
source
"return temperature value for working devices" in { val requester = TestProbe() val device1 = TestProbe() val device2 = TestProbe() val queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), requestId = 1, requester = requester.ref, timeout = 3.seconds)) device1.expectMsg(Device.ReadTemperature(requestId = 0)) device2.expectMsg(Device.ReadTemperature(requestId = 0)) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) requester.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 1, temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.Temperature(2.0)))) }
- Java
-
source
@Test public void testReturnTemperatureValueForWorkingDevices() { TestKit requester = new TestKit(system); TestKit device1 = new TestKit(system); TestKit device2 = new TestKit(system); Map<ActorRef, String> actorToDeviceId = new HashMap<>(); actorToDeviceId.put(device1.getRef(), "device1"); actorToDeviceId.put(device2.getRef(), "device2"); ActorRef queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS))); assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); assertEquals(1L, response.requestId); Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); assertEquals(expectedTemperatures, response.temperatures); }
That was the happy case, but we know that sometimes devices cannot provide a temperature measurement. This scenario is just slightly different from the previous:
- Scala
-
source
"return TemperatureNotAvailable for devices with no readings" in { val requester = TestProbe() val device1 = TestProbe() val device2 = TestProbe() val queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), requestId = 1, requester = requester.ref, timeout = 3.seconds)) device1.expectMsg(Device.ReadTemperature(requestId = 0)) device2.expectMsg(Device.ReadTemperature(requestId = 0)) queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) requester.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 1, temperatures = Map("device1" -> DeviceGroup.TemperatureNotAvailable, "device2" -> DeviceGroup.Temperature(2.0)))) }
- Java
-
source
@Test public void testReturnTemperatureNotAvailableForDevicesWithNoReadings() { TestKit requester = new TestKit(system); TestKit device1 = new TestKit(system); TestKit device2 = new TestKit(system); Map<ActorRef, String> actorToDeviceId = new HashMap<>(); actorToDeviceId.put(device1.getRef(), "device1"); actorToDeviceId.put(device2.getRef(), "device2"); ActorRef queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS))); assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); queryActor.tell(new Device.RespondTemperature(0L, Optional.empty()), device1.getRef()); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); assertEquals(1L, response.requestId); Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); expectedTemperatures.put("device1", DeviceGroup.TemperatureNotAvailable.INSTANCE); expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); assertEquals(expectedTemperatures, response.temperatures); }
We also know, that sometimes device actors stop before answering:
- Scala
-
source
"return DeviceNotAvailable if device stops before answering" in { val requester = TestProbe() val device1 = TestProbe() val device2 = TestProbe() val queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), requestId = 1, requester = requester.ref, timeout = 3.seconds)) device1.expectMsg(Device.ReadTemperature(requestId = 0)) device2.expectMsg(Device.ReadTemperature(requestId = 0)) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) device2.ref ! PoisonPill requester.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 1, temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.DeviceNotAvailable))) }
- Java
-
source
@Test public void testReturnDeviceNotAvailableIfDeviceStopsBeforeAnswering() { TestKit requester = new TestKit(system); TestKit device1 = new TestKit(system); TestKit device2 = new TestKit(system); Map<ActorRef, String> actorToDeviceId = new HashMap<>(); actorToDeviceId.put(device1.getRef(), "device1"); actorToDeviceId.put(device2.getRef(), "device2"); ActorRef queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS))); assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); assertEquals(1L, response.requestId); Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); expectedTemperatures.put("device2", DeviceGroup.DeviceNotAvailable.INSTANCE); assertEquals(expectedTemperatures, response.temperatures); }
If you remember, there is another case related to device actors stopping. It is possible that we get a normal reply from a device actor, but then receive a Terminated
for the same actor later. In this case, we would like to keep the first reply and not mark the device as DeviceNotAvailable
. We should test this, too:
- Scala
-
source
"return temperature reading even if device stops after answering" in { val requester = TestProbe() val device1 = TestProbe() val device2 = TestProbe() val queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), requestId = 1, requester = requester.ref, timeout = 3.seconds)) device1.expectMsg(Device.ReadTemperature(requestId = 0)) device2.expectMsg(Device.ReadTemperature(requestId = 0)) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) device2.ref ! PoisonPill requester.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 1, temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.Temperature(2.0)))) }
- Java
-
source
@Test public void testReturnTemperatureReadingEvenIfDeviceStopsAfterAnswering() { TestKit requester = new TestKit(system); TestKit device1 = new TestKit(system); TestKit device2 = new TestKit(system); Map<ActorRef, String> actorToDeviceId = new HashMap<>(); actorToDeviceId.put(device1.getRef(), "device1"); actorToDeviceId.put(device2.getRef(), "device2"); ActorRef queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(3, TimeUnit.SECONDS))); assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(2.0)), device2.getRef()); device2.getRef().tell(PoisonPill.getInstance(), ActorRef.noSender()); DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); assertEquals(1L, response.requestId); Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); assertEquals(expectedTemperatures, response.temperatures); }
The final case is when not all devices respond in time. To keep our test relatively fast, we will construct the DeviceGroupQuery
actor with a smaller timeout:
- Scala
-
source
"return DeviceTimedOut if device does not answer in time" in { val requester = TestProbe() val device1 = TestProbe() val device2 = TestProbe() val queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), requestId = 1, requester = requester.ref, timeout = 1.second)) device1.expectMsg(Device.ReadTemperature(requestId = 0)) device2.expectMsg(Device.ReadTemperature(requestId = 0)) queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) requester.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 1, temperatures = Map("device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.DeviceTimedOut))) }
- Java
-
source
@Test public void testReturnDeviceTimedOutIfDeviceDoesNotAnswerInTime() { TestKit requester = new TestKit(system); TestKit device1 = new TestKit(system); TestKit device2 = new TestKit(system); Map<ActorRef, String> actorToDeviceId = new HashMap<>(); actorToDeviceId.put(device1.getRef(), "device1"); actorToDeviceId.put(device2.getRef(), "device2"); ActorRef queryActor = system.actorOf( DeviceGroupQuery.props( actorToDeviceId, 1L, requester.getRef(), new FiniteDuration(1, TimeUnit.SECONDS))); assertEquals(0L, device1.expectMsgClass(Device.ReadTemperature.class).requestId); assertEquals(0L, device2.expectMsgClass(Device.ReadTemperature.class).requestId); queryActor.tell(new Device.RespondTemperature(0L, Optional.of(1.0)), device1.getRef()); DeviceGroup.RespondAllTemperatures response = requester.expectMsgClass( java.time.Duration.ofSeconds(5), DeviceGroup.RespondAllTemperatures.class); assertEquals(1L, response.requestId); Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); expectedTemperatures.put("device2", DeviceGroup.DeviceTimedOut.INSTANCE); assertEquals(expectedTemperatures, response.temperatures); }
Our query works as expected now, it is time to include this new functionality in the DeviceGroup
actor now.
Adding query capability to the group
Including the query feature in the group actor is fairly simple now. We did all the heavy lifting in the query actor itself, the group actor only needs to create it with the right initial parameters and nothing else.
- Scala
-
source
class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] var actorToDeviceId = Map.empty[ActorRef, String] var nextCollectionId = 0L override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { // ... other cases omitted case RequestAllTemperatures(requestId) => context.actorOf( DeviceGroupQuery .props(actorToDeviceId = actorToDeviceId, requestId = requestId, requester = sender(), 3.seconds)) } }
- Java
-
source
public class DeviceGroup extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; public DeviceGroup(String groupId) { this.groupId = groupId; } public static Props props(String groupId) { return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId)); } public static final class RequestDeviceList { final long requestId; public RequestDeviceList(long requestId) { this.requestId = requestId; } } public static final class ReplyDeviceList { final long requestId; final Set<String> ids; public ReplyDeviceList(long requestId, Set<String> ids) { this.requestId = requestId; this.ids = ids; } } public static final class RequestAllTemperatures { final long requestId; public RequestAllTemperatures(long requestId) { this.requestId = requestId; } } public static final class RespondAllTemperatures { final long requestId; final Map<String, TemperatureReading> temperatures; public RespondAllTemperatures(long requestId, Map<String, TemperatureReading> temperatures) { this.requestId = requestId; this.temperatures = temperatures; } } public static interface TemperatureReading {} public static final class Temperature implements TemperatureReading { public final double value; public Temperature(double value) { this.value = value; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Temperature that = (Temperature) o; return Double.compare(that.value, value) == 0; } @Override public int hashCode() { long temp = Double.doubleToLongBits(value); return (int) (temp ^ (temp >>> 32)); } @Override public String toString() { return "Temperature{" + "value=" + value + '}'; } } public enum TemperatureNotAvailable implements TemperatureReading { INSTANCE } public enum DeviceNotAvailable implements TemperatureReading { INSTANCE } public enum DeviceTimedOut implements TemperatureReading { INSTANCE } final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); @Override public void preStart() { log.info("DeviceGroup {} started", groupId); } @Override public void postStop() { log.info("DeviceGroup {} stopped", groupId); } private void onAllTemperatures(RequestAllTemperatures r) { // since Java collections are mutable, we want to avoid sharing them between actors (since // multiple Actors (threads) // modifying the same mutable data-structure is not safe), and perform a defensive copy of the // mutable map: // // Feel free to use your favourite immutable data-structures library with Akka in Java // applications! Map<ActorRef, String> actorToDeviceIdCopy = new HashMap<>(this.actorToDeviceId); getContext() .actorOf( DeviceGroupQuery.props( actorToDeviceIdCopy, r.requestId, getSender(), new FiniteDuration(3, TimeUnit.SECONDS))); } @Override public Receive createReceive() { // ... other cases omitted .match(RequestAllTemperatures.class, this::onAllTemperatures) .build(); } }
It is probably worth restating what we said at the beginning of the chapter. By keeping the temporary state that is only relevant to the query itself in a separate actor we keep the group actor implementation very simple. It delegates everything to child actors and therefore does not have to keep state that is not relevant to its core business. Also, multiple queries can now run parallel to each other, in fact, as many as needed. In our case querying an individual device actor is a fast operation, but if this were not the case, for example, because the remote sensors need to be contacted over the network, this design would significantly improve throughput.
We close this chapter by testing that everything works together. This test is a variant of the previous ones, now exercising the group query feature:
- Scala
-
source
"be able to collect temperatures from all active devices" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor1 = probe.lastSender groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor3 = probe.lastSender // Check that the device actors are working deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) // No temperature for device3 groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) probe.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 0, temperatures = Map( "device1" -> DeviceGroup.Temperature(1.0), "device2" -> DeviceGroup.Temperature(2.0), "device3" -> DeviceGroup.TemperatureNotAvailable))) }
- Java
-
source
@Test public void testCollectTemperaturesFromAllActiveDevices() { TestKit probe = new TestKit(system); ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor1 = probe.getLastSender(); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor2 = probe.getLastSender(); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device3"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor3 = probe.getLastSender(); // Check that the device actors are working deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); // No temperature for device 3 groupActor.tell(new DeviceGroup.RequestAllTemperatures(0L), probe.getRef()); DeviceGroup.RespondAllTemperatures response = probe.expectMsgClass(DeviceGroup.RespondAllTemperatures.class); assertEquals(0L, response.requestId); Map<String, DeviceGroup.TemperatureReading> expectedTemperatures = new HashMap<>(); expectedTemperatures.put("device1", new DeviceGroup.Temperature(1.0)); expectedTemperatures.put("device2", new DeviceGroup.Temperature(2.0)); expectedTemperatures.put("device3", DeviceGroup.TemperatureNotAvailable.INSTANCE); assertEquals(expectedTemperatures, response.temperatures); }
Summary
In the context of the IoT system, this guide introduced the following concepts, among others. You can follow the links to review them if necessary:
- The hierarchy of actors and their lifecycle
- The importance of designing messages for flexibility
- How to watch and stop actors, if necessary
What’s Next?
To continue your journey with Akka, we recommend:
- Start building your own applications with Akka, make sure you get involved in our amazing community for help if you get stuck.
- If you’d like some additional background, read the rest of the reference documentation and check out some of the books and videos on Akka.
To get from this guide to a complete application you would likely need to provide either an UI or an API. For this we recommend that you look at the following technologies and see what fits you:
- Akka HTTP is a HTTP server and client library, making it possible to publish and consume HTTP endpoints
- Play Framework is a full fledged web framework that is built on top of Akka HTTP, it integrates well with Akka and can be used to create a complete modern web UI
- Lagom is an opinionated microservice framework built on top of Akka, encoding many best practices around Akka and Play