Fault Tolerance (Java)

Fault Tolerance (Java)

As explained in Actor Systems each actor is the supervisor of its children, and as such each actor defines fault handling supervisor strategy. This strategy cannot be changed afterwards as it is an integral part of the actor system’s structure.

Fault Handling in Practice

First, let us look at a sample that illustrates one way to handle data store errors, which is a typical source of failure in real world applications. Of course it depends on the actual application what is possible to do when the data store is unavailable, but in this sample we use a best effort re-connect approach.

Read the following source code. The inlined comments explain the different pieces of the fault handling and why they are added. It is also highly recommended to run this sample as it is easy to follow the log output to understand what is happening in runtime.

// imports ...

public class FaultHandlingDocSample {

  /**
   * Runs the sample
   */
  public static void main(String[] args) {
    Config config = ConfigFactory.parseString("akka.loglevel = DEBUG \n" + "akka.actor.debug.lifecycle = on");

    ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
    ActorRef worker = system.actorOf(new Props(Worker.class), "worker");
    ActorRef listener = system.actorOf(new Props(Listener.class), "listener");
    // start the work and listen on progress
    // note that the listener is used as sender of the tell,
    // i.e. it will receive replies from the worker
    worker.tell(Start, listener);
  }

  /**
   * Listens on progress from the worker and shuts down the system when enough
   * work has been done.
   */
  public static class Listener extends UntypedActor {
    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void preStart() {
      // If we don't get any progress within 15 seconds then the service is unavailable
      getContext().setReceiveTimeout(Duration.parse("15 seconds"));
    }

    public void onReceive(Object msg) {
      log.debug("received message {}", msg);
      if (msg instanceof Progress) {
        Progress progress = (Progress) msg;
        log.info("Current progress: {} %", progress.percent);
        if (progress.percent >= 100.0) {
          log.info("That's all, shutting down");
          getContext().system().shutdown();
        }
      } else if (msg == Actors.receiveTimeout()) {
        // No progress within 15 seconds, ServiceUnavailable
        log.error("Shutting down due to unavailable service");
        getContext().system().shutdown();
      } else {
        unhandled(msg);
      }
    }
  }

  // messages ...

  /**
   * Worker performs some work when it receives the Start message. It will
   * continuously notify the sender of the Start message of current Progress.
   * The Worker supervise the CounterService.
   */
  public static class Worker extends UntypedActor {
    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    final Timeout askTimeout = new Timeout(Duration.parse("5 seconds"));

    // The sender of the initial Start message will continuously be notified about progress
    ActorRef progressListener;
    final ActorRef counterService = getContext().actorOf(new Props(CounterService.class), "counter");
    final int totalCount = 51;

    // Stop the CounterService child if it throws ServiceUnavailable
    private static SupervisorStrategy strategy = new OneForOneStrategy(-1, Duration.Inf(),
        new Function<Throwable, Directive>() {
          @Override
          public Directive apply(Throwable t) {
            if (t instanceof ServiceUnavailable) {
              return stop();
            } else {
              return escalate();
            }
          }
        });

    @Override
    public SupervisorStrategy supervisorStrategy() {
      return strategy;
    }

    public void onReceive(Object msg) {
      log.debug("received message {}", msg);
      if (msg.equals(Start) && progressListener == null) {
        progressListener = getSender();
        getContext().system().scheduler().schedule(Duration.Zero(), Duration.parse("1 second"), getSelf(), Do);
      } else if (msg.equals(Do)) {
        counterService.tell(new Increment(1), getSelf());
        counterService.tell(new Increment(1), getSelf());
        counterService.tell(new Increment(1), getSelf());

        // Send current progress to the initial sender
        pipe(ask(counterService, GetCurrentCount, askTimeout)
               .mapTo(manifest(CurrentCount.class))
               .map(new Mapper<CurrentCount, Progress>() {
            public Progress apply(CurrentCount c) {
                return new Progress(100.0 * c.count / totalCount);
            }
        }))
        .to(progressListener);
      } else {
        unhandled(msg);
      }
    }
  }

  // messages ...

  /**
   * Adds the value received in Increment message to a persistent counter.
   * Replies with CurrentCount when it is asked for CurrentCount. CounterService
   * supervise Storage and Counter.
   */
  public static class CounterService extends UntypedActor {

    // Reconnect message
    static final Object Reconnect = "Reconnect";

    private static class SenderMsgPair {
      final ActorRef sender;
      final Object msg;

      SenderMsgPair(ActorRef sender, Object msg) {
        this.msg = msg;
        this.sender = sender;
      }
    }

    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    final String key = getSelf().path().name();
    ActorRef storage;
    ActorRef counter;
    final List<SenderMsgPair> backlog = new ArrayList<SenderMsgPair>();
    final int MAX_BACKLOG = 10000;

    // Restart the storage child when StorageException is thrown.
    // After 3 restarts within 5 seconds it will be stopped.
    private static SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.parse("5 seconds"),
        new Function<Throwable, Directive>() {
          @Override
          public Directive apply(Throwable t) {
            if (t instanceof StorageException) {
              return restart();
            } else {
              return escalate();
            }
          }
        });

    @Override
    public SupervisorStrategy supervisorStrategy() {
      return strategy;
    }

    @Override
    public void preStart() {
      initStorage();
    }

    /**
     * The child storage is restarted in case of failure, but after 3 restarts,
     * and still failing it will be stopped. Better to back-off than
     * continuously failing. When it has been stopped we will schedule a
     * Reconnect after a delay. Watch the child so we receive Terminated message
     * when it has been terminated.
     */
    void initStorage() {
      storage = getContext().watch(getContext().actorOf(new Props(Storage.class), "storage"));
      // Tell the counter, if any, to use the new storage
      if (counter != null)
        counter.tell(new UseStorage(storage), getSelf());
      // We need the initial value to be able to operate
      storage.tell(new Get(key), getSelf());
    }

    @Override
    public void onReceive(Object msg) {
      log.debug("received message {}", msg);
      if (msg instanceof Entry && ((Entry) msg).key.equals(key) && counter == null) {
        // Reply from Storage of the initial value, now we can create the Counter
        final long value = ((Entry) msg).value;
        counter = getContext().actorOf(new Props().withCreator(new UntypedActorFactory() {
          public Actor create() {
            return new Counter(key, value);
          }
        }));
        // Tell the counter to use current storage
        counter.tell(new UseStorage(storage), getSelf());
        // and send the buffered backlog to the counter
        for (SenderMsgPair each : backlog) {
          counter.tell(each.msg, each.sender);
        }
        backlog.clear();
      } else if (msg instanceof Increment) {
        forwardOrPlaceInBacklog(msg);
      } else if (msg.equals(GetCurrentCount)) {
        forwardOrPlaceInBacklog(msg);
      } else if (msg instanceof Terminated) {
        // After 3 restarts the storage child is stopped.
        // We receive Terminated because we watch the child, see initStorage.
        storage = null;
        // Tell the counter that there is no storage for the moment
        counter.tell(new UseStorage(null), getSelf());
        // Try to re-establish storage after while
        getContext().system().scheduler().scheduleOnce(Duration.parse("10 seconds"), getSelf(), Reconnect);
      } else if (msg.equals(Reconnect)) {
        // Re-establish storage after the scheduled delay
        initStorage();
      } else {
        unhandled(msg);
      }
    }

    void forwardOrPlaceInBacklog(Object msg) {
      // We need the initial value from storage before we can start delegate to the counter.
      // Before that we place the messages in a backlog, to be sent to the counter when
      // it is initialized.
      if (counter == null) {
        if (backlog.size() >= MAX_BACKLOG)
          throw new ServiceUnavailable("CounterService not available, lack of initial value");
        backlog.add(new SenderMsgPair(getSender(), msg));
      } else {
        counter.forward(msg, getContext());
      }
    }
  }

  // messages ...

  /**
   * The in memory count variable that will send current value to the Storage,
   * if there is any storage available at the moment.
   */
  public static class Counter extends UntypedActor {
    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    final String key;
    long count;
    ActorRef storage;

    public Counter(String key, long initialValue) {
      this.key = key;
      this.count = initialValue;
    }

    @Override
    public void onReceive(Object msg) {
      log.debug("received message {}", msg);
      if (msg instanceof UseStorage) {
        storage = ((UseStorage) msg).storage;
        storeCount();
      } else if (msg instanceof Increment) {
        count += ((Increment) msg).n;
        storeCount();
      } else if (msg.equals(GetCurrentCount)) {
        getSender().tell(new CurrentCount(key, count), getSelf());
      } else {
        unhandled(msg);
      }
    }

    void storeCount() {
      // Delegate dangerous work, to protect our valuable state.
      // We can continue without storage.
      if (storage != null) {
        storage.tell(new Store(new Entry(key, count)), getSelf());
      }
    }
  }

  // messages ...

  /**
   * Saves key/value pairs to persistent storage when receiving Store message.
   * Replies with current value when receiving Get message. Will throw
   * StorageException if the underlying data store is out of order.
   */
  public static class Storage extends UntypedActor {

    final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
    final DummyDB db = DummyDB.instance;

    @Override
    public void onReceive(Object msg) {
      log.debug("received message {}", msg);
      if (msg instanceof Store) {
        Store store = (Store) msg;
        db.save(store.entry.key, store.entry.value);
      } else if (msg instanceof Get) {
        Get get = (Get) msg;
        Long value = db.load(get.key);
        getSender().tell(new Entry(get.key, value == null ? Long.valueOf(0L) : value), getSelf());
      } else {
        unhandled(msg);
      }
    }
  }

  // dummydb ...
}

Creating a Supervisor Strategy

The following sections explain the fault handling mechanism and alternatives in more depth.

For the sake of demonstration let us consider the following strategy:

private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
    new Function<Throwable, Directive>() {
      @Override
      public Directive apply(Throwable t) {
        if (t instanceof ArithmeticException) {
          return resume();
        } else if (t instanceof NullPointerException) {
          return restart();
        } else if (t instanceof IllegalArgumentException) {
          return stop();
        } else {
          return escalate();
        }
      }
    });

@Override
public SupervisorStrategy supervisorStrategy() {
  return strategy;
}

I have chosen a few well-known exception types in order to demonstrate the application of the fault handling directives described in Supervision and Monitoring. First off, it is a one-for-one strategy, meaning that each child is treated separately (an all-for-one strategy works very similarly, the only difference is that any decision is applied to all children of the supervisor, not only the failing one). There are limits set on the restart frequency, namely maximum 10 restarts per minute. -1 and Duration.Inf() means that the respective limit does not apply, leaving the possibility to specify an absolute upper limit on the restarts or to make the restarts work infinitely.

Default Supervisor Strategy

Escalate is used if the defined strategy doesn’t cover the exception that was thrown.

When the supervisor strategy is not defined for an actor the following exceptions are handled by default:

  • ActorInitializationException will stop the failing child actor
  • ActorKilledException will stop the failing child actor
  • Exception will restart the failing child actor
  • Other types of Throwable will be escalated to parent actor

If the exception escalate all the way up to the root guardian it will handle it in the same way as the default strategy defined above.

Test Application

The following section shows the effects of the different directives in practice, wherefor a test setup is needed. First off, we need a suitable supervisor:

static public class Supervisor extends UntypedActor {

  private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
      new Function<Throwable, Directive>() {
        @Override
        public Directive apply(Throwable t) {
          if (t instanceof ArithmeticException) {
            return resume();
          } else if (t instanceof NullPointerException) {
            return restart();
          } else if (t instanceof IllegalArgumentException) {
            return stop();
          } else {
            return escalate();
          }
        }
      });

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }


  public void onReceive(Object o) {
    if (o instanceof Props) {
      getSender().tell(getContext().actorOf((Props) o));
    } else {
      unhandled(o);
    }
  }
}

This supervisor will be used to create a child, with which we can experiment:

static public class Child extends UntypedActor {
  int state = 0;

  public void onReceive(Object o) throws Exception {
    if (o instanceof Exception) {
      throw (Exception) o;
    } else if (o instanceof Integer) {
      state = (Integer) o;
    } else if (o.equals("get")) {
      getSender().tell(state);
    } else {
      unhandled(o);
    }
  }
}

The test is easier by using the utilities described in Testing Actor Systems (Scala), where TestProbe provides an actor ref useful for receiving and inspecting replies.

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.SupervisorStrategy;
import static akka.actor.SupervisorStrategy.*;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.UntypedActor;
import akka.dispatch.Await;
import static akka.pattern.Patterns.ask;
import akka.util.Duration;
import akka.testkit.AkkaSpec;
import akka.testkit.TestProbe;

public class FaultHandlingTestBase {
  static ActorSystem system;
  Duration timeout = Duration.create(5, SECONDS);

  @BeforeClass
  public static void start() {
    system = ActorSystem.create("test", AkkaSpec.testConf());
  }

  @AfterClass
  public static void cleanup() {
    system.shutdown();
  }

  @Test
  public void mustEmploySupervisorStrategy() throws Exception {
    // code here
  }

}

Let us create actors:

Props superprops = new Props(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);

The first test shall demonstrate the Resume directive, so we try it out by setting some non-initial state in the actor and have it fail:

child.tell(42);
assert Await.result(ask(child, "get", 5000), timeout).equals(42);
child.tell(new ArithmeticException());
assert Await.result(ask(child, "get", 5000), timeout).equals(42);

As you can see the value 42 survives the fault handling directive. Now, if we change the failure to a more serious NullPointerException, that will no longer be the case:

child.tell(new NullPointerException());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

And finally in case of the fatal IllegalArgumentException the child will be terminated by the supervisor:

final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException());
probe.expectMsg(new Terminated(child));

Up to now the supervisor was completely unaffected by the child’s failure, because the directives set did handle it. In case of an Exception, this is not true anymore and the supervisor escalates the failure.

child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception());
probe.expectMsg(new Terminated(child));

The supervisor itself is supervised by the top-level actor provided by the ActorSystem, which has the default policy to restart in case of all Exception cases (with the notable exceptions of ActorInitializationException and ActorKilledException). Since the default directive in case of a restart is to kill all children, we expected our poor child not to survive this failure.

In case this is not desired (which depends on the use case), we need to use a different supervisor which overrides this behavior.

static public class Supervisor2 extends UntypedActor {

  private static SupervisorStrategy strategy = new OneForOneStrategy(10, Duration.parse("1 minute"),
      new Function<Throwable, Directive>() {
        @Override
        public Directive apply(Throwable t) {
          if (t instanceof ArithmeticException) {
            return resume();
          } else if (t instanceof NullPointerException) {
            return restart();
          } else if (t instanceof IllegalArgumentException) {
            return stop();
          } else {
            return escalate();
          }
        }
      });

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }


  public void onReceive(Object o) {
    if (o instanceof Props) {
      getSender().tell(getContext().actorOf((Props) o));
    } else {
      unhandled(o);
    }
  }

  @Override
  public void preRestart(Throwable cause, Option<Object> msg) {
    // do not kill all children, which is the default here
  }
}

With this parent, the child survives the escalated restart, as demonstrated in the last test:

superprops = new Props(Supervisor2.class);
supervisor = system.actorOf(superprops, "supervisor2");
child = (ActorRef) Await.result(ask(supervisor, new Props(Child.class), 5000), timeout);
child.tell(23);
assert Await.result(ask(child, "get", 5000), timeout).equals(23);
child.tell(new Exception());
assert Await.result(ask(child, "get", 5000), timeout).equals(0);

Contents