HowTo: Common Patterns - Version 2.4.17

HowTo: Common Patterns

This section lists common actor patterns which have been found to be useful, elegant or instructive. Anything is welcome, example topics being message routing strategies, supervision patterns, restart handling, etc. As a special bonus, additions to this section are marked with the contributor’s name, and it would be nice if every Akka user who finds a recurring pattern in his or her code could share it for the profit of all. Where applicable it might also make sense to add to the akka.pattern package for creating an OTP-like library.

You might find some of the patterns described in the Scala chapter of HowTo: Common Patterns useful even though the example code is written in Scala.

Scheduling Periodic Messages

This pattern describes how to schedule periodic messages to yourself in two different ways.

The first way is to set up periodic message scheduling in the constructor of the actor, and cancel that scheduled sending in postStop or else we might have multiple registered message sends to the same actor.

Note

With this approach the scheduled periodic message send will be restarted with the actor on restarts. This also means that the time period that elapses between two tick messages during a restart may drift off based on when you restart the scheduled message sends relative to the time that the last message was sent, and how long the initial delay is. Worst case scenario is interval plus initialDelay.

public class ScheduleInConstructor extends UntypedActor {

  private final Cancellable tick = getContext().system().scheduler().schedule(
    Duration.create(500, TimeUnit.MILLISECONDS),
    Duration.create(1, TimeUnit.SECONDS),
    getSelf(), "tick", getContext().dispatcher(), null);

  @Override
  public void postStop() {
    tick.cancel();
  }

  @Override
  public void onReceive(Object message) throws Exception {
    if (message.equals("tick")) {
      // do something useful here
    }
    else {
      unhandled(message);
    }
  }
}

The second variant sets up an initial one shot message send in the preStart method of the actor, and the then the actor when it receives this message sets up a new one shot message send. You also have to override postRestart so we don't call preStart and schedule the initial message send again.

Note

With this approach we won't fill up the mailbox with tick messages if the actor is under pressure, but only schedule a new tick message when we have seen the previous one.

public class ScheduleInReceive extends UntypedActor {

  @Override
  public void preStart() {
    getContext().system().scheduler().scheduleOnce(
      Duration.create(500, TimeUnit.MILLISECONDS),
      getSelf(), "tick", getContext().dispatcher(), null);
  }

  // override postRestart so we don't call preStart and schedule a new message
  @Override
  public void postRestart(Throwable reason) {
  }

  @Override
  public void onReceive(Object message) throws Exception {
    if (message.equals("tick")) {
      // send another periodic tick after the specified delay
      getContext().system().scheduler().scheduleOnce(
        Duration.create(1, TimeUnit.SECONDS),
        getSelf(), "tick", getContext().dispatcher(), null);
      // do something useful here
    }
    else {
      unhandled(message);
    }
  }
}

Single-Use Actor Trees with High-Level Error Reporting

Contributed by: Rick Latrine

A nice way to enter the actor world from java is the use of Patterns.ask(). This method starts a temporary actor to forward the message and collect the result from the actor to be "asked". In case of errors within the asked actor the default supervision handling will take over. The caller of Patterns.ask() will not be notified.

If that caller is interested in such an exception, they must make sure that the asked actor replies with Status.Failure(Throwable). Behind the asked actor a complex actor hierarchy might be spawned to accomplish asynchronous work. Then supervision is the established way to control error handling.

Unfortunately the asked actor must know about supervision and must catch the exceptions. Such an actor is unlikely to be reused in a different actor hierarchy and contains crippled try/catch blocks.

This pattern provides a way to encapsulate supervision and error propagation to the temporary actor. Finally the promise returned by Patterns.ask() is fulfilled as a failure, including the exception (see also Actors (Java with Lambda Support) for Java compatibility).

Let's have a look at the example code:

package docs.pattern;

import java.util.concurrent.TimeoutException;

import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import akka.actor.Actor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Cancellable;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Scheduler;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.util.Timeout;

public class SupervisedAsk {

  private static class AskParam {
    Props props;
    Object message;
    Timeout timeout;

    AskParam(Props props, Object message, Timeout timeout) {
      this.props = props;
      this.message = message;
      this.timeout = timeout;
    }
  }

  private static class AskTimeout {
  }

  public static class AskSupervisorCreator extends UntypedActor {

    @Override
    public void onReceive(Object message) throws Exception {
      if (message instanceof AskParam) {
        ActorRef supervisor = getContext().actorOf(
            Props.create(AskSupervisor.class));
        supervisor.forward(message, getContext());
      } else {
        unhandled(message);
      }
    }
  }

  public static class AskSupervisor extends UntypedActor {
    private ActorRef targetActor;
    private ActorRef caller;
    private AskParam askParam;
    private Cancellable timeoutMessage;

    @Override
    public SupervisorStrategy supervisorStrategy() {
      return new OneForOneStrategy(0, Duration.Zero(),
          new Function<Throwable, Directive>() {
            public Directive apply(Throwable cause) {
              caller.tell(new Status.Failure(cause), self());
              return SupervisorStrategy.stop();
            }
          });
    }

    @Override
    public void onReceive(Object message) throws Exception {
      if (message instanceof AskParam) {
        askParam = (AskParam) message;
        caller = getSender();
        targetActor = getContext().actorOf(askParam.props);
        getContext().watch(targetActor);
        targetActor.forward(askParam.message, getContext());
        Scheduler scheduler = getContext().system().scheduler();
        timeoutMessage = scheduler.scheduleOnce(askParam.timeout.duration(),
            self(), new AskTimeout(), context().dispatcher(), null);
      } else if (message instanceof Terminated) {
        Throwable ex = new ActorKilledException("Target actor terminated.");
        caller.tell(new Status.Failure(ex), self());
        timeoutMessage.cancel();
        getContext().stop(self());
      } else if (message instanceof AskTimeout) {
        Throwable ex = new TimeoutException("Target actor timed out after "
            + askParam.timeout.toString());
        caller.tell(new Status.Failure(ex), self());
        getContext().stop(self());
      } else
        unhandled(message);
    }
  }

  public static Future<Object> askOf(ActorRef supervisorCreator, Props props,
      Object message, Timeout timeout) {
    AskParam param = new AskParam(props, message, timeout);
    return Patterns.ask(supervisorCreator, param, timeout);
  }

  synchronized public static ActorRef createSupervisorCreator(
      ActorRefFactory factory) {
    return factory.actorOf(Props.create(AskSupervisorCreator.class));
  }
}

In the askOf method the SupervisorCreator is sent the user message. The SupervisorCreator creates a SupervisorActor and forwards the message. This prevents the actor system from overloading due to actor creations. The SupervisorActor is responsible to create the user actor, forwards the message, handles actor termination and supervision. Additionally the SupervisorActor stops the user actor if execution time expired.

In case of an exception the supervisor tells the temporary actor which exception was thrown. Afterwards the actor hierarchy is stopped.

Finally we are able to execute an actor and receive the results or exceptions.

package docs.pattern;

import scala.concurrent.Await;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.util.Timeout;

public class SupervisedAskSpec {

  public Object execute(Class<? extends UntypedActor> someActor,
      Object message, Timeout timeout, ActorRefFactory actorSystem)
      throws Exception {
    // example usage
    try {
      ActorRef supervisorCreator = SupervisedAsk
          .createSupervisorCreator(actorSystem);
      Future<Object> finished = SupervisedAsk.askOf(supervisorCreator,
          Props.create(someActor), message, timeout);
      return Await.result(finished, timeout.duration());
    } catch (Exception e) {
      // exception propagated by supervision
      throw e;
    }
  }
}

Contents