Fault Tolerance
As explained in Actor Systems each actor is the supervisor of its children, and as such each actor defines fault handling supervisor strategy. This strategy cannot be changed afterwards as it is an integral part of the actor system’s structure.
Fault Handling in Practice
First, let us look at a sample that illustrates one way to handle data store errors, which is a typical source of failure in real world applications. Of course it depends on the actual application what is possible to do when the data store is unavailable, but in this sample we use a best effort re-connect approach.
Read the following source code. The inlined comments explain the different pieces of the fault handling and why they are added. It is also highly recommended to run this sample as it is easy to follow the log output to understand what is happening in runtime.
Creating a Supervisor Strategy
The following sections explain the fault handling mechanism and alternatives in more depth.
For the sake of demonstration let us consider the following strategy:
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
I have chosen a few well-known exception types in order to demonstrate the
application of the fault handling directives described in Supervision and Monitoring.
First off, it is a one-for-one strategy, meaning that each child is treated
separately (an all-for-one strategy works very similarly, the only difference
is that any decision is applied to all children of the supervisor, not only the
failing one). There are limits set on the restart frequency, namely maximum 10
restarts per minute. -1
and Duration.Inf()
means that the respective limit
does not apply, leaving the possibility to specify an absolute upper limit on the
restarts or to make the restarts work infinitely.
The child actor is stopped if the limit is exceeded.
Note
If the strategy is declared inside the supervising actor (as opposed to
a separate class) its decider has access to all internal state of
the actor in a thread-safe fashion, including obtaining a reference to the
currently failed child (available as the getSender
of the failure message).
Default Supervisor Strategy
Escalate
is used if the defined strategy doesn't cover the exception that was thrown.
When the supervisor strategy is not defined for an actor the following exceptions are handled by default:
ActorInitializationException
will stop the failing child actorActorKilledException
will stop the failing child actorException
will restart the failing child actor- Other types of
Throwable
will be escalated to parent actor
If the exception escalate all the way up to the root guardian it will handle it in the same way as the default strategy defined above.
Stopping Supervisor Strategy
Closer to the Erlang way is the strategy to just stop children when they fail
and then take corrective action in the supervisor when DeathWatch signals the
loss of the child. This strategy is also provided pre-packaged as
SupervisorStrategy.stoppingStrategy
with an accompanying
StoppingSupervisorStrategy
configurator to be used when you want the
"/user"
guardian to apply it.
Logging of Actor Failures
By default the SupervisorStrategy
logs failures unless they are escalated.
Escalated failures are supposed to be handled, and potentially logged, at a level
higher in the hierarchy.
You can mute the default logging of a SupervisorStrategy
by setting
loggingEnabled
to false
when instantiating it. Customized logging
can be done inside the Decider
. Note that the reference to the currently
failed child is available as the getSender
when the SupervisorStrategy
is
declared inside the supervising actor.
You may also customize the logging in your own SupervisorStrategy
implementation
by overriding the logFailure
method.
Supervision of Top-Level Actors
Toplevel actors means those which are created using system.actorOf()
, and
they are children of the User Guardian. There are no
special rules applied in this case, the guardian simply applies the configured
strategy.
Test Application
The following section shows the effects of the different directives in practice, wherefor a test setup is needed. First off, we need a suitable supervisor:
public class Supervisor extends UntypedActor {
private static SupervisorStrategy strategy =
new OneForOneStrategy(10, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
}
This supervisor will be used to create a child, with which we can experiment:
public class Child extends UntypedActor {
int state = 0;
public void onReceive(Object o) throws Exception {
if (o instanceof Exception) {
throw (Exception) o;
} else if (o instanceof Integer) {
state = (Integer) o;
} else if (o.equals("get")) {
getSender().tell(state, getSelf());
} else {
unhandled(o);
}
}
}
The test is easier by using the utilities described in Testing Actor Systems,
where TestProbe
provides an actor ref useful for receiving and inspecting replies.
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.actor.SupervisorStrategy.escalate;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import scala.collection.immutable.Seq;
import scala.concurrent.Await;
import static akka.pattern.Patterns.ask;
import scala.concurrent.duration.Duration;
import akka.testkit.TestProbe;
public class FaultHandlingTest {
static ActorSystem system;
Duration timeout = Duration.create(5, SECONDS);
@BeforeClass
public static void start() {
system = ActorSystem.create("test");
}
@AfterClass
public static void cleanup() {
JavaTestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void mustEmploySupervisorStrategy() throws Exception {
// code here
}
}
Let us create actors:
Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
The first test shall demonstrate the Resume
directive, so we try it out by
setting some non-initial state in the actor and have it fail:
child.tell(42, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
As you can see the value 42 survives the fault handling directive. Now, if we
change the failure to a more serious NullPointerException
, that will no
longer be the case:
child.tell(new NullPointerException(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
And finally in case of the fatal IllegalArgumentException
the child will be
terminated by the supervisor:
final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);
Up to now the supervisor was completely unaffected by the child’s failure,
because the directives set did handle it. In case of an Exception
, this is not
true anymore and the supervisor escalates the failure.
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception(), ActorRef.noSender());
probe.expectMsgClass(Terminated.class);
The supervisor itself is supervised by the top-level actor provided by the
ActorSystem
, which has the default policy to restart in case of all
Exception
cases (with the notable exceptions of
ActorInitializationException
and ActorKilledException
). Since the
default directive in case of a restart is to kill all children, we expected our poor
child not to survive this failure.
In case this is not desired (which depends on the use case), we need to use a different supervisor which overrides this behavior.
public class Supervisor2 extends UntypedActor {
private static SupervisorStrategy strategy = new OneForOneStrategy(10,
Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof ArithmeticException) {
return resume();
} else if (t instanceof NullPointerException) {
return restart();
} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
@Override
public void preRestart(Throwable cause, Option<Object> msg) {
// do not kill all children, which is the default here
}
}
With this parent, the child survives the escalated restart, as demonstrated in the last test:
superprops = Props.create(Supervisor2.class);
supervisor = system.actorOf(superprops);
child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell(23, ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception(), ActorRef.noSender());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
Contents