Part 4: Working with Device Groups
Dependency
Add the following dependency in your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-actor_2.12:2.5.32" }
Introduction
Let’s take a closer look at the main functionality required by our use case. In a complete IoT system for monitoring home temperatures, the steps for connecting a device sensor to our system might look like this:
- A sensor device in the home connects through some protocol.
- The component managing network connections accepts the connection.
- The sensor provides its group and device ID to register with the device manager component of our system.
- The device manager component handles registration by looking up or creating the actor responsible for keeping sensor state.
- The actor responds with an acknowledgement, exposing its
ActorRef
. - The networking component now uses the
ActorRef
for communication between the sensor and device actor without going through the device manager.
Steps 1 and 2 take place outside the boundaries of our tutorial system. In this chapter, we will start addressing steps 3-6 and create a way for sensors to register with our system and to communicate with actors. But first, we have another architectural decision — how many levels of actors should we use to represent device groups and device sensors?
One of the main design challenges for Akka programmers is choosing the best granularity for actors. In practice, depending on the characteristics of the interactions between actors, there are usually several valid ways to organize a system. In our use case, for example, it would be possible to have a single actor maintain all the groups and devices — perhaps using hash maps. It would also be reasonable to have an actor for each group that tracks the state of all devices in the same home.
The following guidelines help us choose the most appropriate actor hierarchy:
- In general, prefer larger granularity. Introducing more fine-grained actors than needed causes more problems than it solves.
- Add finer granularity when the system requires:
- Higher concurrency.
- Complex conversations between actors that have many states. We will see a very good example for this in the next chapter.
- Sufficient state that it makes sense to divide into smaller actors.
- Multiple unrelated responsibilities. Using separate actors allows individuals to fail and be restored with little impact on others.
Device manager hierarchy
Considering the principles outlined in the previous section, We will model the device manager component as an actor tree with three levels:
- The top level supervisor actor represents the system component for devices. It is also the entry point to look up and create device group and device actors.
- At the next level, group actors each supervise the device actors for one group id (e.g. one home). They also provide services, such as querying temperature readings from all of the available devices in their group.
- Device actors manage all the interactions with the actual device sensors, such as storing temperature readings.
We chose this three-layered architecture for these reasons:
-
Having groups of individual actors:
- Isolates failures that occur in a group. If a single actor managed all device groups, an error in one group that causes a restart would wipe out the state of groups that are otherwise non-faulty.
- Simplifies the problem of querying all the devices belonging to a group. Each group actor only contains state related to its group.
- Increases parallelism in the system. Since each group has a dedicated actor, they run concurrently and we can query multiple groups concurrently.
-
Having sensors modeled as individual device actors:
- Isolates failures of one device actor from the rest of the devices in the group.
- Increases the parallelism of collecting temperature readings. Network connections from different sensors communicate with their individual device actors directly, reducing contention points.
With the architecture defined, we can start working on the protocol for registering sensors.
The Registration Protocol
As the first step, we need to design the protocol both for registering a device and for creating the group and device actors that will be responsible for it. This protocol will be provided by the DeviceManager
component itself because that is the only actor that is known and available up front: device groups and device actors are created on-demand.
Looking at registration in more detail, we can outline the necessary functionality:
- When a
DeviceManager
receives a request with a group and device id:- If the manager already has an actor for the device group, it forwards the request to it.
- Otherwise, it creates a new device group actor and then forwards the request.
- The
DeviceGroup
actor receives the request to register an actor for the given device:- If the group already has an actor for the device, the group actor forwards the request to the device actor.
- Otherwise, the
DeviceGroup
actor first creates a device actor and then forwards the request.
- The device actor receives the request and sends an acknowledgement to the original sender. Since the device actor acknowledges receipt (instead of the group actor), the sensor will now have the
ActorRef
to send messages directly to its actor.
The messages that we will use to communicate registration requests and their acknowledgement have a simple definition:
- Scala
-
source
final case class RequestTrackDevice(groupId: String, deviceId: String) case object DeviceRegistered
- Java
-
source
public static final class RequestTrackDevice { public final String groupId; public final String deviceId; public RequestTrackDevice(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } } public static final class DeviceRegistered {}
In this case we have not included a request ID field in the messages. Since registration happens once, when the component connects the system to some network protocol, the ID is not important. However, it is usually a best practice to include a request ID.
Now, we’ll start implementing the protocol from the bottom up. In practice, both a top-down and bottom-up approach can work, but in our case, we benefit from the bottom-up approach as it allows us to immediately write tests for the new features without mocking out parts that we will need to build later.
Adding registration support to device actors
At the bottom of our hierarchy are the Device
actors. Their job in the registration process is simple: reply to the registration request with an acknowledgment to the sender. It is also prudent to add a safeguard against requests that come with a mismatched group or device ID.
We will assume that the ID of the sender of the registration message is preserved in the upper layers. We will show you in the next section how this can be achieved.
The device actor registration code looks like the following. Modify your example to match.
- Scala
-
source
object Device { def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) final case class RecordTemperature(requestId: Long, value: Double) final case class TemperatureRecorded(requestId: Long) final case class ReadTemperature(requestId: Long) final case class RespondTemperature(requestId: Long, value: Option[Double]) } class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { import Device._ var lastTemperatureReading: Option[Double] = None override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) override def receive: Receive = { case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) => sender() ! DeviceManager.DeviceRegistered case DeviceManager.RequestTrackDevice(groupId, deviceId) => log.warning( "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", groupId, deviceId, this.groupId, this.deviceId) case RecordTemperature(id, value) => log.info("Recorded temperature reading {} with {}", value, id) lastTemperatureReading = Some(value) sender() ! TemperatureRecorded(id) case ReadTemperature(id) => sender() ! RespondTemperature(id, lastTemperatureReading) } }
- Java
-
source
import akka.actor.AbstractActor; import akka.actor.Props; import akka.event.Logging; import akka.event.LoggingAdapter; import jdocs.tutorial_4.DeviceManager.DeviceRegistered; import jdocs.tutorial_4.DeviceManager.RequestTrackDevice; import java.util.Optional; public class Device extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; final String deviceId; public Device(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } public static Props props(String groupId, String deviceId) { return Props.create(Device.class, () -> new Device(groupId, deviceId)); } public static final class RecordTemperature { final long requestId; final double value; public RecordTemperature(long requestId, double value) { this.requestId = requestId; this.value = value; } } public static final class TemperatureRecorded { final long requestId; public TemperatureRecorded(long requestId) { this.requestId = requestId; } } public static final class ReadTemperature { final long requestId; public ReadTemperature(long requestId) { this.requestId = requestId; } } public static final class RespondTemperature { final long requestId; final Optional<Double> value; public RespondTemperature(long requestId, Optional<Double> value) { this.requestId = requestId; this.value = value; } } Optional<Double> lastTemperatureReading = Optional.empty(); @Override public void preStart() { log.info("Device actor {}-{} started", groupId, deviceId); } @Override public void postStop() { log.info("Device actor {}-{} stopped", groupId, deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match( RequestTrackDevice.class, r -> { if (this.groupId.equals(r.groupId) && this.deviceId.equals(r.deviceId)) { getSender().tell(new DeviceRegistered(), getSelf()); } else { log.warning( "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", r.groupId, r.deviceId, this.groupId, this.deviceId); } }) .match( RecordTemperature.class, r -> { log.info("Recorded temperature reading {} with {}", r.value, r.requestId); lastTemperatureReading = Optional.of(r.value); getSender().tell(new TemperatureRecorded(r.requestId), getSelf()); }) .match( ReadTemperature.class, r -> { getSender() .tell(new RespondTemperature(r.requestId, lastTemperatureReading), getSelf()); }) .build(); } }
We used a feature of scala pattern matching where we can check to see if a certain field equals an expected value. By bracketing variables with backticks, like `variable`
, the pattern will only match if it contains the value of variable
in that position.
We can now write two new test cases, one exercising successful registration, the other testing the case when IDs don’t match:
- Scala
-
source
"reply to registration requests" in { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) probe.lastSender should ===(deviceActor) } "ignore wrong registration requests" in { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref) probe.expectNoMessage(500.milliseconds) deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref) probe.expectNoMessage(500.milliseconds) }
- Java
-
source
@Test public void testReplyToRegistrationRequests() { TestKit probe = new TestKit(system); ActorRef deviceActor = system.actorOf(Device.props("group", "device")); deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "device"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); assertEquals(deviceActor, probe.getLastSender()); } @Test public void testIgnoreWrongRegistrationRequests() { TestKit probe = new TestKit(system); ActorRef deviceActor = system.actorOf(Device.props("group", "device")); deviceActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.getRef()); probe.expectNoMessage(); deviceActor.tell(new DeviceManager.RequestTrackDevice("group", "wrongDevice"), probe.getRef()); probe.expectNoMessage(); }
We used the expectNoMsg()
helper method from TestProbe
TestKit
. This assertion waits until the defined time-limit and fails if it receives any messages during this period. If no messages are received during the waiting period, the assertion passes. It is usually a good idea to keep these timeouts low (but not too low) because they add significant test execution time.
Adding registration support to device group actors
We are done with registration support at the device level, now we have to implement it at the group level. A group actor has more work to do when it comes to registrations, including:
- Handling the registration request by either forwarding it to an existing device actor or by creating a new actor and forwarding the message.
- Tracking which device actors exist in the group and removing them from the group when they are stopped.
Handling the registration request
A device group actor must either forward the request to an existing child, or it should create one. To look up child actors by their device IDs we will use a Map[String, ActorRef]
Map<String, ActorRef>
.
We also want to keep the ID of the original sender of the request so that our device actor can reply directly. This is possible by using forward
instead of the !
tell
operator. The only difference between the two is that forward
keeps the original sender while !
tell
sets the sender to be the current actor. Just like with our device actor, we ensure that we don’t respond to wrong group IDs. Add the following to your source file:
- Scala
-
source
object DeviceGroup { def props(groupId: String): Props = Props(new DeviceGroup(groupId)) } class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { case trackMsg @ RequestTrackDevice(`groupId`, _) => deviceIdToActor.get(trackMsg.deviceId) match { case Some(deviceActor) => deviceActor.forward(trackMsg) case None => log.info("Creating device actor for {}", trackMsg.deviceId) val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") deviceIdToActor += trackMsg.deviceId -> deviceActor deviceActor.forward(trackMsg) } case RequestTrackDevice(groupId, deviceId) => log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId) } }
- Java
-
source
public class DeviceGroup extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; public DeviceGroup(String groupId) { this.groupId = groupId; } public static Props props(String groupId) { return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId)); } final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); @Override public void preStart() { log.info("DeviceGroup {} started", groupId); } @Override public void postStop() { log.info("DeviceGroup {} stopped", groupId); } private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { if (this.groupId.equals(trackMsg.groupId)) { ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); if (deviceActor != null) { deviceActor.forward(trackMsg, getContext()); } else { log.info("Creating device actor for {}", trackMsg.deviceId); deviceActor = getContext() .actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); deviceIdToActor.put(trackMsg.deviceId, deviceActor); deviceActor.forward(trackMsg, getContext()); } } else { log.warning( "Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId); } } @Override public Receive createReceive() { return receiveBuilder() .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) .build(); } }
Just as we did with the device, we test this new functionality. We also test that the actors returned for the two different IDs are actually different, and we also attempt to record a temperature reading for each of the devices to see if the actors are responding.
- Scala
-
source
"be able to register a device actor" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor1 = probe.lastSender groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender deviceActor1 should !==(deviceActor2) // Check that the device actors are working deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) } "ignore requests for wrong groupId" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) probe.expectNoMessage(500.milliseconds) }
- Java
-
source
@Test public void testRegisterDeviceActor() { TestKit probe = new TestKit(system); ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor1 = probe.getLastSender(); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor2 = probe.getLastSender(); assertNotEquals(deviceActor1, deviceActor2); // Check that the device actors are working deviceActor1.tell(new Device.RecordTemperature(0L, 1.0), probe.getRef()); assertEquals(0L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); deviceActor2.tell(new Device.RecordTemperature(1L, 2.0), probe.getRef()); assertEquals(1L, probe.expectMsgClass(Device.TemperatureRecorded.class).requestId); } @Test public void testIgnoreRequestsForWrongGroupId() { TestKit probe = new TestKit(system); ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); groupActor.tell(new DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.getRef()); probe.expectNoMessage(); }
If a device actor already exists for the registration request, we would like to use the existing actor instead of a new one. We have not tested this yet, so we need to fix this:
- Scala
-
source
"return same actor for same deviceId" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor1 = probe.lastSender groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender deviceActor1 should ===(deviceActor2) }
- Java
-
source
@Test public void testReturnSameActorForSameDeviceId() { TestKit probe = new TestKit(system); ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor1 = probe.getLastSender(); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef deviceActor2 = probe.getLastSender(); assertEquals(deviceActor1, deviceActor2); }
Keeping track of the device actors in the group
So far, we have implemented logic for registering device actors in the group. Devices come and go, however, so we will need a way to remove device actors from the Map[String, ActorRef]
Map<String, ActorRef>
. We will assume that when a device is removed, its corresponding device actor is stopped. Supervision, as we discussed earlier, only handles error scenarios — not graceful stopping. So we need to notify the parent when one of the device actors is stopped.
Akka provides a Death Watch feature that allows an actor to watch another actor and be notified if the other actor is stopped. Unlike supervision, watching is not limited to parent-child relationships, any actor can watch any other actor as long as it knows the ActorRef
. After a watched actor stops, the watcher receives a Terminated(actorRef)
message which also contains the reference to the watched actor. The watcher can either handle this message explicitly or will fail with a DeathPactException
. This latter is useful if the actor can no longer perform its own duties after the watched actor has been stopped. In our case, the group should still function after one device have been stopped, so we need to handle the Terminated(actorRef)
message.
Our device group actor needs to include functionality that:
- Starts watching new device actors when they are created.
- Removes a device actor from the
Map[String, ActorRef]
Map<String, ActorRef>
— which maps devices to device actors — when the notification indicates it has stopped.
Unfortunately, the Terminated
message only contains the ActorRef
of the child actor. We need the actor’s ID to remove it from the map of existing device to device actor mappings. To be able to do this removal, we need to introduce another placeholder, Map[ActorRef, String]
Map<ActorRef, String>
, that allow us to find out the device ID corresponding to a given ActorRef
.
Adding the functionality to identify the actor results in this:
- Scala
-
source
class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] var actorToDeviceId = Map.empty[ActorRef, String] override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { case trackMsg @ RequestTrackDevice(`groupId`, _) => deviceIdToActor.get(trackMsg.deviceId) match { case Some(deviceActor) => deviceActor.forward(trackMsg) case None => log.info("Creating device actor for {}", trackMsg.deviceId) val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") context.watch(deviceActor) actorToDeviceId += deviceActor -> trackMsg.deviceId deviceIdToActor += trackMsg.deviceId -> deviceActor deviceActor.forward(trackMsg) } case RequestTrackDevice(groupId, deviceId) => log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId) case Terminated(deviceActor) => val deviceId = actorToDeviceId(deviceActor) log.info("Device actor for {} has been terminated", deviceId) actorToDeviceId -= deviceActor deviceIdToActor -= deviceId } } - Java
-
source
public class DeviceGroup extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; public DeviceGroup(String groupId) { this.groupId = groupId; } public static Props props(String groupId) { return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId)); } final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); @Override public void preStart() { log.info("DeviceGroup {} started", groupId); } @Override public void postStop() { log.info("DeviceGroup {} stopped", groupId); } private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { if (this.groupId.equals(trackMsg.groupId)) { ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); if (deviceActor != null) { deviceActor.forward(trackMsg, getContext()); } else { log.info("Creating device actor for {}", trackMsg.deviceId); deviceActor = getContext() .actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); getContext().watch(deviceActor); actorToDeviceId.put(deviceActor, trackMsg.deviceId); deviceIdToActor.put(trackMsg.deviceId, deviceActor); deviceActor.forward(trackMsg, getContext()); } } else { log.warning( "Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId); } } private void onTerminated(Terminated t) { ActorRef deviceActor = t.getActor(); String deviceId = actorToDeviceId.get(deviceActor); log.info("Device actor for {} has been terminated", deviceId); actorToDeviceId.remove(deviceActor); deviceIdToActor.remove(deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) .match(Terminated.class, this::onTerminated) .build(); } }
So far we have no means to get which devices the group device actor keeps track of and, therefore, we cannot test our new functionality yet. To make it testable, we add a new query capability (message RequestDeviceList(requestId: Long)
RequestDeviceList
) that lists the currently active device IDs:
- Scala
-
source
object DeviceGroup { def props(groupId: String): Props = Props(new DeviceGroup(groupId)) final case class RequestDeviceList(requestId: Long) final case class ReplyDeviceList(requestId: Long, ids: Set[String]) } class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] var actorToDeviceId = Map.empty[ActorRef, String] override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { case trackMsg @ RequestTrackDevice(`groupId`, _) => deviceIdToActor.get(trackMsg.deviceId) match { case Some(deviceActor) => deviceActor.forward(trackMsg) case None => log.info("Creating device actor for {}", trackMsg.deviceId) val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}") context.watch(deviceActor) actorToDeviceId += deviceActor -> trackMsg.deviceId deviceIdToActor += trackMsg.deviceId -> deviceActor deviceActor.forward(trackMsg) } case RequestTrackDevice(groupId, deviceId) => log.warning("Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId) case RequestDeviceList(requestId) => sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) case Terminated(deviceActor) => val deviceId = actorToDeviceId(deviceActor) log.info("Device actor for {} has been terminated", deviceId) actorToDeviceId -= deviceActor deviceIdToActor -= deviceId } }
- Java
-
source
public class DeviceGroup extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); final String groupId; public DeviceGroup(String groupId) { this.groupId = groupId; } public static Props props(String groupId) { return Props.create(DeviceGroup.class, () -> new DeviceGroup(groupId)); } public static final class RequestDeviceList { final long requestId; public RequestDeviceList(long requestId) { this.requestId = requestId; } } public static final class ReplyDeviceList { final long requestId; final Set<String> ids; public ReplyDeviceList(long requestId, Set<String> ids) { this.requestId = requestId; this.ids = ids; } } final Map<String, ActorRef> deviceIdToActor = new HashMap<>(); final Map<ActorRef, String> actorToDeviceId = new HashMap<>(); @Override public void preStart() { log.info("DeviceGroup {} started", groupId); } @Override public void postStop() { log.info("DeviceGroup {} stopped", groupId); } private void onTrackDevice(DeviceManager.RequestTrackDevice trackMsg) { if (this.groupId.equals(trackMsg.groupId)) { ActorRef deviceActor = deviceIdToActor.get(trackMsg.deviceId); if (deviceActor != null) { deviceActor.forward(trackMsg, getContext()); } else { log.info("Creating device actor for {}", trackMsg.deviceId); deviceActor = getContext() .actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId); getContext().watch(deviceActor); actorToDeviceId.put(deviceActor, trackMsg.deviceId); deviceIdToActor.put(trackMsg.deviceId, deviceActor); deviceActor.forward(trackMsg, getContext()); } } else { log.warning( "Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId); } } private void onDeviceList(RequestDeviceList r) { getSender().tell(new ReplyDeviceList(r.requestId, deviceIdToActor.keySet()), getSelf()); } private void onTerminated(Terminated t) { ActorRef deviceActor = t.getActor(); String deviceId = actorToDeviceId.get(deviceActor); log.info("Device actor for {} has been terminated", deviceId); actorToDeviceId.remove(deviceActor); deviceIdToActor.remove(deviceId); } @Override public Receive createReceive() { return receiveBuilder() .match(DeviceManager.RequestTrackDevice.class, this::onTrackDevice) .match(RequestDeviceList.class, this::onDeviceList) .match(Terminated.class, this::onTerminated) .build(); } }
We are almost ready to test the removal of devices. But, we still need the following capabilities:
- To stop a device actor from our test case. From the outside, any actor can be stopped by sending a special the built-in message,
PoisonPill
, which instructs the actor to stop. - To be notified once the device actor is stopped. We can use the Death Watch facility for this purpose, too. The
TestProbe
TestKit
has two messages that we can easily use,watch()
to watch a specific actor, andexpectTerminated
to assert that the watched actor has been terminated.
We add two more test cases now. In the first, we test that we get back the list of proper IDs once we have added a few devices. The second test case makes sure that the device ID is properly removed after the device actor has been stopped:
- Scala
-
source
"be able to list active devices" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) } "be able to list active devices after one shuts down" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val toShutDown = probe.lastSender groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) probe.watch(toShutDown) toShutDown ! PoisonPill probe.expectTerminated(toShutDown) // using awaitAssert to retry because it might take longer for the groupActor // to see the Terminated, that order is undefined probe.awaitAssert { groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) } }
- Java
-
source
@Test public void testListActiveDevices() { TestKit probe = new TestKit(system); ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); assertEquals(0L, reply.requestId); assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); } @Test public void testListActiveDevicesAfterOneShutsDown() { TestKit probe = new TestKit(system); ActorRef groupActor = system.actorOf(DeviceGroup.props("group")); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device1"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); ActorRef toShutDown = probe.getLastSender(); groupActor.tell(new DeviceManager.RequestTrackDevice("group", "device2"), probe.getRef()); probe.expectMsgClass(DeviceManager.DeviceRegistered.class); groupActor.tell(new DeviceGroup.RequestDeviceList(0L), probe.getRef()); DeviceGroup.ReplyDeviceList reply = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); assertEquals(0L, reply.requestId); assertEquals(Stream.of("device1", "device2").collect(Collectors.toSet()), reply.ids); probe.watch(toShutDown); toShutDown.tell(PoisonPill.getInstance(), ActorRef.noSender()); probe.expectTerminated(toShutDown); // using awaitAssert to retry because it might take longer for the groupActor // to see the Terminated, that order is undefined probe.awaitAssert( () -> { groupActor.tell(new DeviceGroup.RequestDeviceList(1L), probe.getRef()); DeviceGroup.ReplyDeviceList r = probe.expectMsgClass(DeviceGroup.ReplyDeviceList.class); assertEquals(1L, r.requestId); assertEquals(Stream.of("device2").collect(Collectors.toSet()), r.ids); return null; }); }
Creating device manager actors
Going up to the next level in our hierarchy, we need to create the entry point for our device manager component in the DeviceManager
source file. This actor is very similar to the device group actor, but creates device group actors instead of device actors:
- Scala
-
source
object DeviceManager { def props(): Props = Props(new DeviceManager) final case class RequestTrackDevice(groupId: String, deviceId: String) case object DeviceRegistered } class DeviceManager extends Actor with ActorLogging { var groupIdToActor = Map.empty[String, ActorRef] var actorToGroupId = Map.empty[ActorRef, String] override def preStart(): Unit = log.info("DeviceManager started") override def postStop(): Unit = log.info("DeviceManager stopped") override def receive = { case trackMsg @ RequestTrackDevice(groupId, _) => groupIdToActor.get(groupId) match { case Some(ref) => ref.forward(trackMsg) case None => log.info("Creating device group actor for {}", groupId) val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId) context.watch(groupActor) groupActor.forward(trackMsg) groupIdToActor += groupId -> groupActor actorToGroupId += groupActor -> groupId } case Terminated(groupActor) => val groupId = actorToGroupId(groupActor) log.info("Device group actor for {} has been terminated", groupId) actorToGroupId -= groupActor groupIdToActor -= groupId } }
- Java
-
source
public class DeviceManager extends AbstractActor { private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this); public static Props props() { return Props.create(DeviceManager.class, DeviceManager::new); } public static final class RequestTrackDevice { public final String groupId; public final String deviceId; public RequestTrackDevice(String groupId, String deviceId) { this.groupId = groupId; this.deviceId = deviceId; } } public static final class DeviceRegistered {} final Map<String, ActorRef> groupIdToActor = new HashMap<>(); final Map<ActorRef, String> actorToGroupId = new HashMap<>(); @Override public void preStart() { log.info("DeviceManager started"); } @Override public void postStop() { log.info("DeviceManager stopped"); } private void onTrackDevice(RequestTrackDevice trackMsg) { String groupId = trackMsg.groupId; ActorRef ref = groupIdToActor.get(groupId); if (ref != null) { ref.forward(trackMsg, getContext()); } else { log.info("Creating device group actor for {}", groupId); ActorRef groupActor = getContext().actorOf(DeviceGroup.props(groupId), "group-" + groupId); getContext().watch(groupActor); groupActor.forward(trackMsg, getContext()); groupIdToActor.put(groupId, groupActor); actorToGroupId.put(groupActor, groupId); } } private void onTerminated(Terminated t) { ActorRef groupActor = t.getActor(); String groupId = actorToGroupId.get(groupActor); log.info("Device group actor for {} has been terminated", groupId); actorToGroupId.remove(groupActor); groupIdToActor.remove(groupId); } public Receive createReceive() { return receiveBuilder() .match(RequestTrackDevice.class, this::onTrackDevice) .match(Terminated.class, this::onTerminated) .build(); } }
We leave tests of the device manager as an exercise for you since it is very similar to the tests we have already written for the group actor.
What’s next?
We have now a hierarchical component for registering and tracking devices and recording measurements. We have seen how to implement different types of conversation patterns, such as:
- Request-respond (for temperature recordings)
- Delegate-respond (for registration of devices)
- Create-watch-terminate (for creating the group and device actor as children)
In the next chapter, we will introduce group query capabilities, which will establish a new conversation pattern of scatter-gather. In particular, we will implement the functionality that allows users to query the status of all the devices belonging to a group.