Interaction Patterns

Dependency

To use Akka Actor Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor-typed_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-actor-typed_2.12:2.5.32"
}

Introduction

Interacting with an Actor in Akka Typed is done through an ActorRef[T] where T is the type of messages the actor accepts, also known as the “protocol”. This ensures that only the right kind of messages can be sent to an actor and also that no one else but the Actor itself can access the Actor instance internals.

Message exchange with Actors follow a few common patterns, let’s go through each one of them.

Fire and Forget

The fundamental way to interact with an actor is through “tell”, which is so common that it has a special symbolic method name: actorRef ! message. Sending a message with tell can safely be done from any thread.

Tell is asynchronous which means that the method returns right away, when the statement after it is executed there is no guarantee that the message has been processed by the recipient yet. It also means there is no way to know if the message was received, the processing succeeded or failed.

With the given protocol and actor behavior:

Scala
sourcecase class PrintMe(message: String)

val printerBehavior: Behavior[PrintMe] = Behaviors.receive {
  case (context, PrintMe(message)) =>
    context.log.info(message)
    Behaviors.same
}
Java
sourceclass PrintMe {
  public final String message;

  public PrintMe(String message) {
    this.message = message;
  }
}

static final Behavior<PrintMe> printerBehavior =
    Behaviors.receive(PrintMe.class)
        .onMessage(
            PrintMe.class,
            (context, printMe) -> {
              context.getLog().info(printMe.message);
              return Behaviors.same();
            })
        .build();

Fire and forget looks like this:

Scala
sourceval system = ActorSystem(printerBehavior, "fire-and-forget-sample")

// note how the system is also the top level actor ref
val printer: ActorRef[PrintMe] = system

// these are all fire and forget
printer ! PrintMe("message 1")
printer ! PrintMe("not message 2")
Java
sourcefinal ActorSystem<PrintMe> system =
    ActorSystem.create(printerBehavior, "printer-sample-system");

// note that system is also the ActorRef to the guardian actor
final ActorRef<PrintMe> ref = system;

// these are all fire and forget
ref.tell(new PrintMe("message 1"));
ref.tell(new PrintMe("message 2"));

Useful when:

  • It is not critical to be sure that the message was processed
  • There is no way to act on non successful delivery or processing
  • We want to minimize the number of messages created to get higher throughput (sending a response would require creating twice the number of messages)

Problems:

  • If the inflow of messages is higher than the actor can process the inbox will fill up and can in the worst case cause the JVM crash with an OutOfMemoryError
  • If the message gets lost, the sender will not know

Request-Response

Many interactions between actors requires one or more response message being sent back from the receiving actor. A response message can be a result of a query, some form of acknowledgment that the message was received and processed or events that the request subscribed to.

In Akka Typed the recipient of responses has to be encoded as a field in the message itself, which the recipient can then use to send (tell) a response back.

With the following protocol:

Scala
sourcecase class Request(query: String, respondTo: ActorRef[Response])
case class Response(result: String)
Java
sourceclass Request {
  public final String query;
  public final ActorRef<Response> respondTo;

  public Request(String query, ActorRef<Response> respondTo) {
    this.query = query;
    this.respondTo = respondTo;
  }
}

class Response {
  public final String result;

  public Response(String result) {
    this.result = result;
  }
}

The sender would use its own ActorRef[Response], which it can access through ActorContext.self, for the respondTo.

Scala
sourceotherActor ! Request("give me cookies", context.self)
Java
sourceotherActor.tell(new Request("give me cookies", context.getSelf()));

On the receiving side the ActorRef[response] can then be used to send one or more responses back:

Scala
sourceval otherBehavior = Behaviors.receiveMessage[Request] {
  case Request(query, respondTo) =>
    // ... process query ...
    respondTo ! Response("Here's your cookies!")
    Behaviors.same
}
Java
source// actor behavior
Behaviors.receive(Request.class)
    .onMessage(
        Request.class,
        (context, request) -> {
          // ... process request ...
          request.respondTo.tell(new Response("Here's your response!"));
          return Behaviors.same();
        })
    .build();

Useful when:

  • Subscribing to an actor that will send many response messages back

Problems:

  • Actors seldom have a response message from another actor as a part of their protocol (see adapted response)
  • It is hard to detect that a message request was not delivered or processed (see ask)
  • Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor (see ask or per session child actor)

Adapted Response

Most often the sending actor does not, and should not, support receiving the response messages of another actor. In such cases we need to provide an ActorRef of the right type and adapt the response message to a type that the sending actor can handle.

Scala
source
object Backend { sealed trait Request final case class StartTranslationJob(taskId: Int, site: URI, replyTo: ActorRef[Response]) extends Request sealed trait Response final case class JobStarted(taskId: Int) extends Response final case class JobProgress(taskId: Int, progress: Double) extends Response final case class JobCompleted(taskId: Int, result: URI) extends Response } object Frontend { sealed trait Command final case class Translate(site: URI, replyTo: ActorRef[URI]) extends Command private final case class WrappedBackendResponse(response: Backend.Response) extends Command def translator(backend: ActorRef[Backend.Request]): Behavior[Command] = Behaviors.setup[Command] { context => val backendResponseMapper: ActorRef[Backend.Response] = context.messageAdapter(rsp => WrappedBackendResponse(rsp)) def active(inProgress: Map[Int, ActorRef[URI]], count: Int): Behavior[Command] = { Behaviors.receiveMessage[Command] { case Translate(site, replyTo) => val taskId = count + 1 backend ! Backend.StartTranslationJob(taskId, site, backendResponseMapper) active(inProgress.updated(taskId, replyTo), taskId) case wrapped: WrappedBackendResponse => wrapped.response match { case Backend.JobStarted(taskId) => context.log.info("Started {}", taskId) Behaviors.same case Backend.JobProgress(taskId, progress) => context.log.info("Progress {}: {}", taskId, progress) Behaviors.same case Backend.JobCompleted(taskId, result) => context.log.info("Completed {}: {}", taskId, result) inProgress(taskId) ! result active(inProgress - taskId, count) } } } active(inProgress = Map.empty, count = 0) } }
Java
source
public static class Backend { interface Request {} public static class StartTranslationJob implements Request { public final int taskId; public final URI site; public final ActorRef<Response> replyTo; public StartTranslationJob(int taskId, URI site, ActorRef<Response> replyTo) { this.taskId = taskId; this.site = site; this.replyTo = replyTo; } } interface Response {} public static class JobStarted implements Response { public final int taskId; public JobStarted(int taskId) { this.taskId = taskId; } } public static class JobProgress implements Response { public final int taskId; public final double progress; public JobProgress(int taskId, double progress) { this.taskId = taskId; this.progress = progress; } } public static class JobCompleted implements Response { public final int taskId; public final URI result; public JobCompleted(int taskId, URI result) { this.taskId = taskId; this.result = result; } } } public static class Frontend { interface Command {} public static class Translate implements Command { public final URI site; public final ActorRef<URI> replyTo; public Translate(URI site, ActorRef<URI> replyTo) { this.site = site; this.replyTo = replyTo; } } private static class WrappedJobStarted implements Command { final Backend.JobStarted response; public WrappedJobStarted(Backend.JobStarted response) { this.response = response; } } private static class WrappedJobProgress implements Command { final Backend.JobProgress response; public WrappedJobProgress(Backend.JobProgress response) { this.response = response; } } private static class WrappedJobCompleted implements Command { final Backend.JobCompleted response; public WrappedJobCompleted(Backend.JobCompleted response) { this.response = response; } } private static class OtherResponse implements Command { final Backend.Response response; public OtherResponse(Backend.Response response) { this.response = response; } } public static class Translator extends AbstractBehavior<Command> { private final ActorContext<Command> context; private final ActorRef<Backend.Request> backend; private final ActorRef<Backend.Response> backendResponseAdapter; private int taskIdCounter = 0; private Map<Integer, ActorRef<URI>> inProgress = new HashMap<>(); public Translator(ActorContext<Command> context, ActorRef<Backend.Request> backend) { this.context = context; this.backend = backend; this.backendResponseAdapter = context.messageAdapter( Backend.Response.class, rsp -> { if (rsp instanceof Backend.JobStarted) return new WrappedJobStarted((Backend.JobStarted) rsp); else if (rsp instanceof Backend.JobProgress) return new WrappedJobProgress((Backend.JobProgress) rsp); else if (rsp instanceof Backend.JobCompleted) return new WrappedJobCompleted((Backend.JobCompleted) rsp); else return new OtherResponse(rsp); }); } @Override public Receive<Command> createReceive() { return newReceiveBuilder() .onMessage( Translate.class, cmd -> { taskIdCounter += 1; inProgress.put(taskIdCounter, cmd.replyTo); backend.tell( new Backend.StartTranslationJob( taskIdCounter, cmd.site, backendResponseAdapter)); return this; }) .onMessage( WrappedJobStarted.class, wrapped -> { context.getLog().info("Started {}", wrapped.response.taskId); return this; }) .onMessage( WrappedJobProgress.class, wrapped -> { context .getLog() .info("Progress {}: {}", wrapped.response.taskId, wrapped.response.progress); return this; }) .onMessage( WrappedJobCompleted.class, wrapped -> { context .getLog() .info("Completed {}: {}", wrapped.response.taskId, wrapped.response.result); return this; }) .onMessage(OtherResponse.class, other -> Behaviors.unhandled()) .build(); } } }

You can register several message adapters for different message classes. It’s only possible to have one message adapter per message class to make sure that the number of adapters are not growing unbounded if registered repeatedly. That also means that a registered adapter will replace an existing adapter for the same message class.

A message adapter will be used if the message class matches the given class or is a subclass thereof. The registered adapters are tried in reverse order of their registration order, i.e. the last registered first.

A message adapter (and the returned ActorRef) has the same lifecycle as the receiving actor. It’s recommended to register the adapters in a top level Behaviors.setup or constructor of AbstractBehavior but it’s possible to register them later also if needed.

The adapter function is running in the receiving actor and can safely access state of it, but if it throws an exception the actor is stopped.

Useful when:

  • Translating between different actor message protocols
  • Subscribing to an actor that will send many response messages back

Problems:

  • It is hard to detect that a message request was not delivered or processed (see ask)
  • Only one adaption can be made per response message type, if a new one is registered the old one is replaced, for example different target actors can’t have different adaption if they use the same response types, unless some correlation is encoded in the messages
  • Unless the protocol already includes a way to provide context, for example a request id that is also sent in the response, it is not possible to tie an interaction to some specific context without introducing a new, separate, actor

Request-Response with ask between two actors

In an interaction where there is a 1:1 mapping between a request and a response we can use ask on the ActorContext to interact with another actor.

The interaction has two steps, first we need to construct the outgoing message, to do that we need an ActorRef[Response] to put as recipient in the outgoing message. The second step is to transform the successful Response or failure into a message that is part of the protocol of the sending actor.

Scala
sourcesealed trait HalCommand
case class OpenThePodBayDoorsPlease(respondTo: ActorRef[HalResponse]) extends HalCommand
case class HalResponse(message: String)

val halBehavior = Behaviors.receiveMessage[HalCommand] {
  case OpenThePodBayDoorsPlease(respondTo) =>
    respondTo ! HalResponse("I'm sorry, Dave. I'm afraid I can't do that.")
    Behaviors.same
}

sealed trait DaveMessage
// this is a part of the protocol that is internal to the actor itself
case class AdaptedResponse(message: String) extends DaveMessage

def daveBehavior(hal: ActorRef[HalCommand]) = Behaviors.setup[DaveMessage] { context =>
  // asking someone requires a timeout, if the timeout hits without response
  // the ask is failed with a TimeoutException
  implicit val timeout: Timeout = 3.seconds

  // Note: The second parameter list takes a function `ActorRef[T] => Message`,
  // as OpenThePodBayDoorsPlease is a case class it has a factory apply method
  // that is what we are passing as the second parameter here it could also be written
  // as `ref => OpenThePodBayDoorsPlease(ref)`
  context.ask(hal)(OpenThePodBayDoorsPlease) {
    case Success(HalResponse(message)) => AdaptedResponse(message)
    case Failure(ex)                   => AdaptedResponse("Request failed")
  }

  // we can also tie in request context into an interaction, it is safe to look at
  // actor internal state from the transformation function, but remember that it may have
  // changed at the time the response arrives and the transformation is done, best is to
  // use immutable state we have closed over like here.
  val requestId = 1
  context.ask(hal)(OpenThePodBayDoorsPlease) {
    case Success(HalResponse(message)) => AdaptedResponse(s"$requestId: $message")
    case Failure(ex)                   => AdaptedResponse(s"$requestId: Request failed")
  }

  Behaviors.receiveMessage {
    // the adapted message ends up being processed like any other
    // message sent to the actor
    case AdaptedResponse(message) =>
      context.log.info("Got response from hal: {}", message)
      Behaviors.same
  }
}
Java
sourceinterface HalCommand {}

static final class OpenThePodBayDoorsPlease implements HalCommand {
  public final ActorRef<HalResponse> respondTo;

  OpenThePodBayDoorsPlease(ActorRef<HalResponse> respondTo) {
    this.respondTo = respondTo;
  }
}

static final class HalResponse {
  public final String message;

  HalResponse(String message) {
    this.message = message;
  }
}

static final Behavior<HalCommand> halBehavior =
    Behaviors.receive(HalCommand.class)
        .onMessage(
            OpenThePodBayDoorsPlease.class,
            (context, message) -> {
              message.respondTo.tell(
                  new HalResponse("I'm sorry, Dave. I'm afraid I can't do that."));
              return Behaviors.same();
            })
        .build();

interface DaveProtocol {}

// this is a part of the protocol that is internal to the actor itself
private static final class AdaptedResponse implements DaveProtocol {
  public final String message;

  public AdaptedResponse(String message) {
    this.message = message;
  }
}

public static Behavior<DaveProtocol> daveBehavior(final ActorRef<HalCommand> hal) {
  return Behaviors.setup(
      (ActorContext<DaveProtocol> context) -> {

        // asking someone requires a timeout, if the timeout hits without response
        // the ask is failed with a TimeoutException
        final Duration timeout = Duration.ofSeconds(3);

        context.ask(
            HalResponse.class,
            hal,
            timeout,
            // construct the outgoing message
            (ActorRef<HalResponse> ref) -> new OpenThePodBayDoorsPlease(ref),
            // adapt the response (or failure to respond)
            (response, throwable) -> {
              if (response != null) {
                return new AdaptedResponse(response.message);
              } else {
                return new AdaptedResponse("Request failed");
              }
            });

        // we can also tie in request context into an interaction, it is safe to look at
        // actor internal state from the transformation function, but remember that it may have
        // changed at the time the response arrives and the transformation is done, best is to
        // use immutable state we have closed over like here.
        final int requestId = 1;
        context.ask(
            HalResponse.class,
            hal,
            timeout,
            // construct the outgoing message
            (ActorRef<HalResponse> ref) -> new OpenThePodBayDoorsPlease(ref),
            // adapt the response (or failure to respond)
            (response, throwable) -> {
              if (response != null) {
                return new AdaptedResponse(requestId + ": " + response.message);
              } else {
                return new AdaptedResponse(requestId + ": Request failed");
              }
            });

        return Behaviors.receive(DaveProtocol.class)
            // the adapted message ends up being processed like any other
            // message sent to the actor
            .onMessage(
                AdaptedResponse.class,
                (innerCtx, response) -> {
                  innerCtx.getLog().info("Got response from HAL: {}", response.message);
                  return Behaviors.same();
                })
            .build();
      });
}

The response adapting function is running in the receiving actor and can safely access state of it, but if it throws an exception the actor is stopped.

Useful when:

  • Single response queries
  • An actor needs to know that the message was processed before continuing
  • To allow an actor to resend if a timely response is not produced
  • To keep track of outstanding requests and not overwhelm a recipient with messages (“backpressure”)
  • Context should be attached to the interaction but the protocol does not support that (request id, what query the response was for)

Problems:

  • There can only be a single response to one ask (see per session child Actor)
  • When ask times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact
  • Finding a good value for the timeout, especially when ask is triggers chained asks in the receiving actor. You want a short timeout to be responsive and answer back to the requester, but at the same time you do not want to have many false positives

Request-Response with ask from outside an Actor

Some times you need to interact with actors from outside of the actor system, this can be done with fire-and-forget as described above or through another version of ask that returns a Future[Response] that is either completed with a successful response or failed with a TimeoutException if there was no response within the specified timeout.

To do this we use ActorRef.ask (or the symbolic ActorRef.?) implicitly provided by akka.actor.typed.scaladsl.AskPattern to send a message to an actor and get a Future[Response] back.

Scala
sourcetrait CookieCommand {}
case class GiveMeCookies(replyTo: ActorRef[Cookies]) extends CookieCommand
case class Cookies(count: Int)

import akka.actor.typed.scaladsl.AskPattern._

// asking someone requires a timeout and a scheduler, if the timeout hits without response
// the ask is failed with a TimeoutException
implicit val timeout: Timeout = 3.seconds
implicit val scheduler = system.scheduler

val result: Future[Cookies] = cookieActorRef.ask(ref => GiveMeCookies(ref))

// the response callback will be executed on this execution context
implicit val ec = system.executionContext

result.onComplete {
  case Success(cookies) => println("Yay, cookies!")
  case Failure(ex)      => println("Boo! didn't get cookies in time.")
}
Java
sourceinterface CookieCommand {}

static class GiveMeCookies implements CookieCommand {
  public final ActorRef<Cookies> cookies;

  GiveMeCookies(ActorRef<Cookies> cookies) {
    this.cookies = cookies;
  }
};

static class Cookies {}

public void askAndPrint(ActorSystem<Object> system, ActorRef<CookieCommand> cookieActorRef) {
  CompletionStage<Cookies> result =
      AskPattern.ask(
          cookieActorRef,
          GiveMeCookies::new,
          // asking someone requires a timeout and a scheduler, if the timeout hits without
          // response
          // the ask is failed with a TimeoutException
          Duration.ofSeconds(3),
          system.scheduler());

  result.whenComplete(
      (cookies, failure) -> {
        if (cookies != null) System.out.println("Yay, cookies!");
        else System.out.println("Boo! didn't get cookies in time.");
      });
}

Useful when:

  • Querying an actor from outside of the actor system

Problems:

  • It is easy to accidentally close over and unsafely mutable state with the callbacks on the returned Future as those will be executed on a different thread
  • There can only be a single response to one ask (see per session child Actor)
  • When ask times out, the receiving actor does not know and may still process it to completion, or even start processing it after the fact

Per session child Actor

In some cases a complete response to a request can only be created and sent back after collecting multiple answers from other actors. For these kinds of interaction it can be good to delegate the work to a per “session” child actor. The child could also contain arbitrary logic to implement retrying, failing on timeout, tail chopping, progress inspection etc.

Note that this in fact essentially how ask is implemented, if all you need is a single response with a timeout it is better to use ask.

The child is created with the context it needs to do the work, including an ActorRef that it can respond to. When the complete result is there the child responds with the result and stops itself.

As the protocol of the session actor is not a public API but rather an implementation detail of the parent actor, it may not always make sense to have an explicit protocol and adapt the messages of the actors that the session actor interacts with. For this use case it is possible to express that the actor can receive any message (Any).

Scala
source// dummy data types just for this sample
case class Keys()
case class Wallet()

// messages for the two services we interact with
trait HomeCommand
case class LeaveHome(who: String, respondTo: ActorRef[ReadyToLeaveHome]) extends HomeCommand
case class ReadyToLeaveHome(who: String, keys: Keys, wallet: Wallet)

case class GetKeys(whoseKeys: String, respondTo: ActorRef[Keys])
case class GetWallet(whoseWallet: String, respondTo: ActorRef[Wallet])

def homeBehavior = Behaviors.receive[HomeCommand] { (context, message) =>
  val keyCabinet: ActorRef[GetKeys] = context.spawn(keyCabinetBehavior, "key-cabinet")
  val drawer: ActorRef[GetWallet] = context.spawn(drawerBehavior, "drawer")

  message match {
    case LeaveHome(who, respondTo) =>
      context.spawn(prepareToLeaveHome(who, respondTo, keyCabinet, drawer), s"leaving-$who")
      Behavior.same
  }
}

// per session actor behavior
def prepareToLeaveHome(
    whoIsLeaving: String,
    respondTo: ActorRef[ReadyToLeaveHome],
    keyCabinet: ActorRef[GetKeys],
    drawer: ActorRef[GetWallet]): Behavior[NotUsed] =
  // we don't _really_ care about the actor protocol here as nobody will send us
  // messages except for responses to our queries, so we just accept any kind of message
  // but narrow that to more limited types then we interact
  Behaviors
    .setup[AnyRef] { context =>
      var wallet: Option[Wallet] = None
      var keys: Option[Keys] = None

      // we narrow the ActorRef type to any subtype of the actual type we accept
      keyCabinet ! GetKeys(whoIsLeaving, context.self.narrow[Keys])
      drawer ! GetWallet(whoIsLeaving, context.self.narrow[Wallet])

      def nextBehavior: Behavior[AnyRef] =
        (keys, wallet) match {
          case (Some(w), Some(k)) =>
            // we got both, "session" is completed!
            respondTo ! ReadyToLeaveHome(whoIsLeaving, w, k)
            Behavior.stopped

          case _ =>
            Behavior.same
        }

      Behaviors.receiveMessage {
        case w: Wallet =>
          wallet = Some(w)
          nextBehavior
        case k: Keys =>
          keys = Some(k)
          nextBehavior
        case _ =>
          Behaviors.unhandled
      }
    }
    .narrow[NotUsed] // we don't let anyone else know we accept anything
Java
source// dummy data types just for this sample
interface Keys {}

interface Wallet {}
// messages for the two services we interact with
class GetKeys {
  public final String whoseKeys;
  public final ActorRef<Keys> respondTo;

  public GetKeys(String whoseKeys, ActorRef<Keys> respondTo) {
    this.whoseKeys = whoseKeys;
    this.respondTo = respondTo;
  }
}

class GetWallet {
  public final String whoseWallet;
  public final ActorRef<Wallet> respondTo;

  public GetWallet(String whoseWallet, ActorRef<Wallet> respondTo) {
    this.whoseWallet = whoseWallet;
    this.respondTo = respondTo;
  }
}

interface HomeCommand {}

class LeaveHome implements HomeCommand {
  public final String who;
  public final ActorRef<ReadyToLeaveHome> respondTo;

  public LeaveHome(String who, ActorRef<ReadyToLeaveHome> respondTo) {
    this.who = who;
    this.respondTo = respondTo;
  }
}

class ReadyToLeaveHome {
  public final String who;
  public final Keys keys;
  public final Wallet wallet;

  public ReadyToLeaveHome(String who, Keys keys, Wallet wallet) {
    this.who = who;
    this.keys = keys;
    this.wallet = wallet;
  }
}

// actor behavior
public Behavior<HomeCommand> homeBehavior() {
  return Behaviors.setup(
      (context) -> {
        final ActorRef<GetKeys> keyCabinet = context.spawn(keyCabinetBehavior, "key-cabinet");
        final ActorRef<GetWallet> drawer = context.spawn(drawerBehavior, "drawer");

        return Behaviors.receive(HomeCommand.class)
            .onMessage(
                LeaveHome.class,
                (innerCtx, message) -> {
                  context.spawn(
                      new PrepareToLeaveHome(message.who, message.respondTo, keyCabinet, drawer),
                      "leaving" + message.who);
                  return Behavior.same();
                })
            .build();
      });
}

// per session actor behavior
class PrepareToLeaveHome extends AbstractBehavior<Object> {
  private final String whoIsLeaving;
  private final ActorRef<ReadyToLeaveHome> respondTo;
  private final ActorRef<GetKeys> keyCabinet;
  private final ActorRef<GetWallet> drawer;
  private Optional<Wallet> wallet = Optional.empty();
  private Optional<Keys> keys = Optional.empty();

  public PrepareToLeaveHome(
      String whoIsLeaving,
      ActorRef<ReadyToLeaveHome> respondTo,
      ActorRef<GetKeys> keyCabinet,
      ActorRef<GetWallet> drawer) {
    this.whoIsLeaving = whoIsLeaving;
    this.respondTo = respondTo;
    this.keyCabinet = keyCabinet;
    this.drawer = drawer;
  }

  @Override
  public Receive<Object> createReceive() {
    return newReceiveBuilder()
        .onMessage(
            Wallet.class,
            (wallet) -> {
              this.wallet = Optional.of(wallet);
              return completeOrContinue();
            })
        .onMessage(
            Keys.class,
            (keys) -> {
              this.keys = Optional.of(keys);
              return completeOrContinue();
            })
        .build();
  }

  private Behavior<Object> completeOrContinue() {
    if (wallet.isPresent() && keys.isPresent()) {
      respondTo.tell(new ReadyToLeaveHome(whoIsLeaving, keys.get(), wallet.get()));
      return Behaviors.stopped();
    } else {
      return this;
    }
  }
}

In an actual session child you would likely want to include some form of timeout as well (see scheduling messages to self).

Useful when:

  • A single incoming request should result in multiple interactions with other actors before a result can be built, for example aggregation of several results
  • You need to handle acknowledgement and retry messages for at-least-once delivery

Problems:

  • Children have life cycles that must be managed to not create a resource leak, it can be easy to miss a scenario where the session actor is not stopped
  • It increases complexity, since each such child can execute concurrently with other children and the parent

Scheduling messages to self

The following example demonstrates how to use timers to schedule messages to an actor.

The Buncher actor buffers a burst of incoming messages and delivers them as a batch after a timeout or when the number of batched messages exceeds a maximum size.

Scala
sourcecase object TimerKey

trait Msg
case class ExcitingMessage(message: String) extends Msg
final case class Batch(messages: Vector[Msg])
case object Timeout extends Msg

def behavior(target: ActorRef[Batch], after: FiniteDuration, maxSize: Int): Behavior[Msg] = {
  Behaviors.withTimers(timers => idle(timers, target, after, maxSize))
}

def idle(
    timers: TimerScheduler[Msg],
    target: ActorRef[Batch],
    after: FiniteDuration,
    maxSize: Int): Behavior[Msg] = {
  Behaviors.receiveMessage[Msg] { message =>
    timers.startSingleTimer(TimerKey, Timeout, after)
    active(Vector(message), timers, target, after, maxSize)
  }
}

def active(
    buffer: Vector[Msg],
    timers: TimerScheduler[Msg],
    target: ActorRef[Batch],
    after: FiniteDuration,
    maxSize: Int): Behavior[Msg] = {
  Behaviors.receiveMessage[Msg] {
    case Timeout =>
      target ! Batch(buffer)
      idle(timers, target, after, maxSize)
    case m =>
      val newBuffer = buffer :+ m
      if (newBuffer.size == maxSize) {
        timers.cancel(TimerKey)
        target ! Batch(newBuffer)
        idle(timers, target, after, maxSize)
      } else
        active(newBuffer, timers, target, after, maxSize)
  }
}
Java
sourceinterface Msg {}

public static final class Batch {
  private final List<Msg> messages;

  public Batch(List<Msg> messages) {
    this.messages = Collections.unmodifiableList(messages);
  }

  public List<Msg> getMessages() {
    return messages;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;
    Batch batch = (Batch) o;
    return Objects.equals(messages, batch.messages);
  }

  @Override
  public int hashCode() {
    return Objects.hash(messages);
  }
}

public static final class ExcitingMessage implements Msg {
  private final String message;

  public ExcitingMessage(String message) {
    this.message = message;
  }
}

private static final Object TIMER_KEY = new Object();

private static class TimeoutMsg implements Msg {}

public static Behavior<Msg> behavior(ActorRef<Batch> target, Duration after, int maxSize) {
  return Behaviors.withTimers(timers -> idle(timers, target, after, maxSize));
}

private static Behavior<Msg> idle(
    TimerScheduler<Msg> timers, ActorRef<Batch> target, Duration after, int maxSize) {
  return Behaviors.receive(Msg.class)
      .onMessage(
          Msg.class,
          (context, message) -> {
            timers.startSingleTimer(TIMER_KEY, new TimeoutMsg(), after);
            List<Msg> buffer = new ArrayList<>();
            buffer.add(message);
            return active(buffer, timers, target, after, maxSize);
          })
      .build();
}

private static Behavior<Msg> active(
    List<Msg> buffer,
    TimerScheduler<Msg> timers,
    ActorRef<Batch> target,
    Duration after,
    int maxSize) {
  return Behaviors.receive(Msg.class)
      .onMessage(
          TimeoutMsg.class,
          (context, message) -> {
            target.tell(new Batch(buffer));
            return idle(timers, target, after, maxSize);
          })
      .onMessage(
          Msg.class,
          (context, message) -> {
            buffer.add(message);
            if (buffer.size() == maxSize) {
              timers.cancel(TIMER_KEY);
              target.tell(new Batch(buffer));
              return idle(timers, target, after, maxSize);
            } else {
              return active(buffer, timers, target, after, maxSize);
            }
          })
      .build();
}

There are a few things worth noting here:

  • To get access to the timers you start with Behaviors.withTimers that will pass a TimerScheduler instance to the function. This can be used with any type of Behavior, including receive, receiveMessage, but also setup or any other behavior.
  • Each timer has a key and if a new timer with same key is started the previous is cancelled and it’s guaranteed that a message from the previous timer is not received, even though it might already be enqueued in the mailbox when the new timer is started.
  • Both periodic and single message timers are supported.
  • The TimerScheduler is mutable in itself, because it performs and manages the side effects of registering the scheduled tasks.
  • The TimerScheduler is bound to the lifecycle of the actor that owns it and it’s cancelled automatically when the actor is stopped.
  • Behaviors.withTimers can also be used inside Behaviors.supervise and it will automatically cancel the started timers correctly when the actor is restarted, so that the new incarnation will not receive scheduled messages from previous incarnation.
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.