Fault Tolerance

Dependency

The concept of fault tolerance relates to actors, so in order to use these make sure to depend on actors:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-actor_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-actor_2.12:2.5.32"
}

Introduction

As explained in Actor Systems each actor is the supervisor of its children, and as such each actor defines fault handling supervisor strategy. This strategy cannot be changed afterwards as it is an integral part of the actor system’s structure.

Fault Handling in Practice

First, let us look at a sample that illustrates one way to handle data store errors, which is a typical source of failure in real world applications. Of course it depends on the actual application what is possible to do when the data store is unavailable, but in this sample we use a best effort re-connect approach.

Read the following source code. The inlined comments explain the different pieces of the fault handling and why they are added. It is also highly recommended to run this sample as it is easy to follow the log output to understand what is happening at runtime.

Creating a Supervisor Strategy

The following sections explain the fault handling mechanism and alternatives in more depth.

For the sake of demonstration let us consider the following strategy:

Scala
sourceimport akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException      => Resume
    case _: NullPointerException     => Restart
    case _: IllegalArgumentException => Stop
    case _: Exception                => Escalate
  }
Java
sourceprivate static SupervisorStrategy strategy =
    new OneForOneStrategy(
        10,
        Duration.ofMinutes(1),
        DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
            .match(NullPointerException.class, e -> SupervisorStrategy.restart())
            .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
            .matchAny(o -> SupervisorStrategy.escalate())
            .build());

@Override
public SupervisorStrategy supervisorStrategy() {
  return strategy;
}

We have chosen a few well-known exception types in order to demonstrate the application of the fault handling directives described in supervision. First off, it is a one-for-one strategy, meaning that each child is treated separately (an all-for-one strategy works very similarly, the only difference is that any decision is applied to all children of the supervisor, not only the failing one). In the above example, 10 and 1 minuteDuration.create(1, TimeUnit.MINUTES) are passed to the maxNrOfRetries and withinTimeRange parameters respectively, which means that the strategy restarts a child up to 10 restarts per minute. The child actor is stopped if the restart count exceeds maxNrOfRetries during the withinTimeRange duration.

Also, there are special values for these parameters. If you specify:

  • -1 to maxNrOfRetries, and Duration.InfDuration.Inf() to withinTimeRange
    • then the child is always restarted without any limit
  • -1 to maxNrOfRetries, and a non-infinite Duration to withinTimeRange
    • maxNrOfRetries is treated as 1
  • a non-negative number to maxNrOfRetries and Duration.InfDuration.Inf() to withinTimeRange
    • withinTimeRange is treated as infinite duration (i.e.) no matter how long it takes, once the restart count exceeds maxNrOfRetries, the child actor is stopped

The match statement which forms the bulk of the body
is of type Decider which is a PartialFunction[Throwable, Directive]. consists of PFBuilder returned by DeciderBuilder’s match method, where the builder is finished by the build method. This is the piece which maps child failure types to their corresponding directives.

Note

If the strategy is declared inside the supervising actor (as opposed to within a companion objecta separate class) its decider has access to all internal state of the actor in a thread-safe fashion, including obtaining a reference to the currently failed child (available as the sendergetSender() of the failure message).

Default Supervisor Strategy

Escalate is used if the defined strategy doesn’t cover the exception that was thrown.

When the supervisor strategy is not defined for an actor the following exceptions are handled by default:

  • ActorInitializationException will stop the failing child actor
  • ActorKilledException will stop the failing child actor
  • DeathPactException will stop the failing child actor
  • Exception will restart the failing child actor
  • Other types of Throwable will be escalated to parent actor

If the exception escalate all the way up to the root guardian it will handle it in the same way as the default strategy defined above.

You can combine your own strategy with the default strategy:

sourceimport akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._

override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException => Resume
    case t =>
      super.supervisorStrategy.decider.applyOrElse(t, (_: Any) => Escalate)
  }

Stopping Supervisor Strategy

Closer to the Erlang way is the strategy to stop children when they fail and then take corrective action in the supervisor when DeathWatch signals the loss of the child. This strategy is also provided pre-packaged as SupervisorStrategy.stoppingStrategy with an accompanying StoppingSupervisorStrategy configurator to be used when you want the "/user" guardian to apply it.

Logging of Actor Failures

By default the SupervisorStrategy logs failures unless they are escalated. Escalated failures are supposed to be handled, and potentially logged, at a level higher in the hierarchy.

You can mute the default logging of a SupervisorStrategy by setting loggingEnabled to false when instantiating it. Customized logging can be done inside the Decider. Note that the reference to the currently failed child is available as the sender when the SupervisorStrategy is declared inside the supervising actor.

You may also customize the logging in your own SupervisorStrategy implementation by overriding the logFailure method.

Supervision of Top-Level Actors

Toplevel actors means those which are created using system.actorOf(), and they are children of the User Guardian. There are no special rules applied in this case, the guardian applies the configured strategy.

Test Application

The following section shows the effects of the different directives in practice, where a test setup is needed. First off, we need a suitable supervisor:

Scala
sourceimport akka.actor.Actor

class Supervisor extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
}
Java
sourceimport akka.japi.pf.DeciderBuilder;
import akka.actor.SupervisorStrategy;

static class Supervisor extends AbstractActor {

  private static SupervisorStrategy strategy =
      new OneForOneStrategy(
          10,
          Duration.ofMinutes(1),
          DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
              .match(NullPointerException.class, e -> SupervisorStrategy.restart())
              .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
              .matchAny(o -> SupervisorStrategy.escalate())
              .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }


  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Props.class,
            props -> {
              getSender().tell(getContext().actorOf(props), getSelf());
            })
        .build();
  }
}

This supervisor will be used to create a child, with which we can experiment:

Scala
sourceimport akka.actor.Actor

class Child extends Actor {
  var state = 0
  def receive = {
    case ex: Exception => throw ex
    case x: Int        => state = x
    case "get"         => sender() ! state
  }
}
Java
sourcestatic class Child extends AbstractActor {
  int state = 0;

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Exception.class,
            exception -> {
              throw exception;
            })
        .match(Integer.class, i -> state = i)
        .matchEquals("get", s -> getSender().tell(state, getSelf()))
        .build();
  }
}

The test is easier by using the utilities described in Testing Actor SystemsTestKit, where TestProbe provides an actor ref useful for receiving and inspecting replies.

Scala
sourceimport com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers }
import akka.testkit.{ EventFilter, ImplicitSender, TestActors, TestKit }

class FaultHandlingDocSpec(_system: ActorSystem)
    extends TestKit(_system)
    with ImplicitSender
    with WordSpecLike
    with Matchers
    with BeforeAndAfterAll {

  def this() =
    this(
      ActorSystem(
        "FaultHandlingDocSpec",
        ConfigFactory.parseString("""
      akka {
        loggers = ["akka.testkit.TestEventListener"]
        loglevel = "WARNING"
      }
      """)))

  override def afterAll: Unit = {
    TestKit.shutdownActorSystem(system)
  }

  "A supervisor" must {
    "apply the chosen strategy for its child" in {
      // code here
    }
  }
}
Java
sourceimport akka.testkit.TestProbe;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import static java.util.concurrent.TimeUnit.SECONDS;
import static akka.japi.Util.immutableSeq;
import scala.concurrent.Await;

public class FaultHandlingTest extends AbstractJavaTest {
  static ActorSystem system;
  scala.concurrent.duration.Duration timeout =
      scala.concurrent.duration.Duration.create(5, SECONDS);

  @BeforeClass
  public static void start() {
    system = ActorSystem.create("FaultHandlingTest", config);
  }

  @AfterClass
  public static void cleanup() {
    TestKit.shutdownActorSystem(system);
    system = null;
  }

  @Test
  public void mustEmploySupervisorStrategy() throws Exception {
    // code here
  }
}

Let us create actors:

Scala
sourceval supervisor = system.actorOf(Props[Supervisor], "supervisor")

supervisor ! Props[Child]
val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor
Java
sourceProps superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child =
    (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);

The first test shall demonstrate the Resume directive, so we try it out by setting some non-initial state in the actor and have it fail:

Scala
sourcechild ! 42 // set state to 42
child ! "get"
expectMsg(42)

child ! new ArithmeticException // crash it
child ! "get"
expectMsg(42)
Java
sourcechild.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);

As you can see the value 42 survives the fault handling directive. Now, if we change the failure to a more serious NullPointerException, that will no longer be the case:

Scala
sourcechild ! new NullPointerException // crash it harder
child ! "get"
expectMsg(0)
Java
sourcechild.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

And finally in case of the fatal IllegalArgumentException the child will be terminated by the supervisor:

Scala
sourcewatch(child) // have testActor watch “child”
child ! new IllegalArgumentException // break it
expectMsgPF() { case Terminated(`child`) => () }
Java
sourcefinal TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

Up to now the supervisor was completely unaffected by the child’s failure, because the directives set did handle it. In case of an Exception, this is not true anymore and the supervisor escalates the failure.

Scala
sourcesupervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]
watch(child2)
child2 ! "get" // verify it is alive
expectMsg(0)

child2 ! new Exception("CRASH") // escalate failure
expectMsgPF() {
  case t @ Terminated(`child2`) if t.existenceConfirmed => ()
}
Java
sourcechild = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);

The supervisor itself is supervised by the top-level actor provided by the ActorSystem, which has the default policy to restart in case of all Exception cases (with the notable exceptions of ActorInitializationException and ActorKilledException). Since the default directive in case of a restart is to kill all children, we expected our poor child not to survive this failure.

In case this is not desired (which depends on the use case), we need to use a different supervisor which overrides this behavior.

Scala
sourceclass Supervisor2 extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ArithmeticException      => Resume
      case _: NullPointerException     => Restart
      case _: IllegalArgumentException => Stop
      case _: Exception                => Escalate
    }

  def receive = {
    case p: Props => sender() ! context.actorOf(p)
  }
  // override default to kill all children during restart
  override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {}
}
Java
sourcestatic class Supervisor2 extends AbstractActor {

  private static SupervisorStrategy strategy =
      new OneForOneStrategy(
          10,
          Duration.ofMinutes(1),
          DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
              .match(NullPointerException.class, e -> SupervisorStrategy.restart())
              .match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
              .matchAny(o -> SupervisorStrategy.escalate())
              .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }


  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            Props.class,
            props -> {
              getSender().tell(getContext().actorOf(props), getSelf());
            })
        .build();
  }

  @Override
  public void preRestart(Throwable cause, Optional<Object> msg) {
    // do not kill all children, which is the default here
  }
}

With this parent, the child survives the escalated restart, as demonstrated in the last test:

Scala
sourceval supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

supervisor2 ! Props[Child]
val child3 = expectMsgType[ActorRef]

child3 ! 23
child3 ! "get"
expectMsg(23)

child3 ! new Exception("CRASH")
child3 ! "get"
expectMsg(0)
Java
sourcesuperprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.