HowTo: Common Patterns

This section lists common actor patterns which have been found to be useful, elegant or instructive. Anything is welcome, example topics being message routing strategies, supervision patterns, restart handling, etc. As a special bonus, additions to this section are marked with the contributor’s name, and it would be nice if every Akka user who finds a recurring pattern in his or her code could share it for the profit of all. Where applicable it might also make sense to add to the akka.pattern package for creating an OTP-like library.

You might find some of the patterns described in the Scala chapter of this page useful even though the example code is written in Scala.

Scheduling Periodic Messages

See Actor Timers

Single-Use Actor Trees with High-Level Error Reporting

Contributed by: Rick Latrine

A nice way to enter the actor world from java is the use of Patterns.ask(). This method starts a temporary actor to forward the message and collect the result from the actor to be “asked”. In case of errors within the asked actor the default supervision handling will take over. The caller of Patterns.ask() will not be notified.

If that caller is interested in such an exception, they must make sure that the asked actor replies with Status.Failure(Throwable). Behind the asked actor a complex actor hierarchy might be spawned to accomplish asynchronous work. Then supervision is the established way to control error handling.

Unfortunately the asked actor must know about supervision and must catch the exceptions. Such an actor is unlikely to be reused in a different actor hierarchy and contains crippled try/catch blocks.

This pattern provides a way to encapsulate supervision and error propagation to the temporary actor. Finally the promise returned by Patterns.ask() is fulfilled as a failure, including the exception (see also Java 8 Compatibility for Java compatibility).

Let’s have a look at the example code:

source/*
 * Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
 */

package jdocs.pattern;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import java.time.Duration;

import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Scheduler;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.actor.AbstractActor;
import akka.pattern.Patterns;

public class SupervisedAsk {

  private static class AskParam {
    Props props;
    Object message;
    Duration timeout;

    AskParam(Props props, Object message, Duration timeout) {
      this.props = props;
      this.message = message;
      this.timeout = timeout;
    }
  }

  private static class AskTimeout {}

  public static class AskSupervisorCreator extends AbstractActor {

    @Override
    public Receive createReceive() {
      return receiveBuilder()
          .match(
              AskParam.class,
              message -> {
                ActorRef supervisor = getContext().actorOf(Props.create(AskSupervisor.class));
                supervisor.forward(message, getContext());
              })
          .build();
    }
  }

  public static class AskSupervisor extends AbstractActor {
    private ActorRef targetActor;
    private ActorRef caller;
    private AskParam askParam;
    private Cancellable timeoutMessage;

    @Override
    public SupervisorStrategy supervisorStrategy() {
      return new OneForOneStrategy(
          0,
          Duration.ZERO,
          cause -> {
            caller.tell(new Status.Failure(cause), getSelf());
            return SupervisorStrategy.stop();
          });
    }

    @Override
    public Receive createReceive() {
      return receiveBuilder()
          .match(
              AskParam.class,
              message -> {
                askParam = message;
                caller = getSender();
                targetActor = getContext().actorOf(askParam.props);
                getContext().watch(targetActor);
                targetActor.forward(askParam.message, getContext());
                Scheduler scheduler = getContext().getSystem().scheduler();
                timeoutMessage =
                    scheduler.scheduleOnce(
                        askParam.timeout,
                        getSelf(),
                        new AskTimeout(),
                        getContext().getDispatcher(),
                        null);
              })
          .match(
              Terminated.class,
              message -> {
                Throwable ex = new ActorKilledException("Target actor terminated.");
                caller.tell(new Status.Failure(ex), getSelf());
                timeoutMessage.cancel();
                getContext().stop(getSelf());
              })
          .match(
              AskTimeout.class,
              message -> {
                Throwable ex =
                    new TimeoutException(
                        "Target actor timed out after " + askParam.timeout.toString());
                caller.tell(new Status.Failure(ex), getSelf());
                getContext().stop(getSelf());
              })
          .build();
    }
  }

  public static CompletionStage<Object> askOf(
      ActorRef supervisorCreator, Props props, Object message, Duration timeout) {
    AskParam param = new AskParam(props, message, timeout);
    return Patterns.ask(supervisorCreator, param, timeout);
  }

  public static synchronized ActorRef createSupervisorCreator(ActorRefFactory factory) {
    return factory.actorOf(Props.create(AskSupervisorCreator.class));
  }
}

In the askOf method the SupervisorCreator is sent the user message. The SupervisorCreator creates a SupervisorActor and forwards the message. This prevents the actor system from overloading due to actor creations. The SupervisorActor is responsible to create the user actor, forwards the message, handles actor termination and supervision. Additionally the SupervisorActor stops the user actor if execution time expired.

In case of an exception the supervisor tells the temporary actor which exception was thrown. Afterwards the actor hierarchy is stopped.

Finally we are able to execute an actor and receive the results or exceptions.

source/*
 * Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
 */

package jdocs.pattern;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.actor.AbstractActor;
import akka.util.Timeout;
import scala.concurrent.duration.FiniteDuration;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

public class SupervisedAskSpec {

  public Object execute(
      Class<? extends AbstractActor> someActor,
      Object message,
      Duration timeout,
      ActorRefFactory actorSystem)
      throws Exception {
    // example usage
    try {
      ActorRef supervisorCreator = SupervisedAsk.createSupervisorCreator(actorSystem);
      CompletionStage<Object> finished =
          SupervisedAsk.askOf(supervisorCreator, Props.create(someActor), message, timeout);
      return finished.toCompletableFuture().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
    } catch (Exception e) {
      // exception propagated by supervision
      throw e;
    }
  }
}

Scalable Distributed Event Sourcing and CQRS

The Lagom Framework encodes many best practices in combining Akka Persistence and Akka Persistence Query with Cluster Sharding to build scalable and resilient systems with Event Sourcing and CQRS.

See Managing Data Persistence and Persistent Entity in the Lagom documentation.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.