Diagrams of the Fault Tolerance Sample

faulttolerancesample-normal-flow.png

The above diagram illustrates the normal message flow.

Normal flow:

Step Description
1 The progress Listener starts the work.
2 The Worker schedules work by sending Do messages periodically to itself
3, 4, 5 When receiving Do the Worker tells the CounterService to increment the counter, three times. The Increment message is forwarded to the Counter, which updates its counter variable and sends current value to the Storage.
6, 7 The Worker asks the CounterService of current value of the counter and pipes the result back to the Listener.

faulttolerancesample-failure-flow.png

The above diagram illustrates what happens in case of storage failure.

Failure flow:

Step Description
1 The Storage throws StorageException.
2 The CounterService is supervisor of the Storage and restarts the Storage when StorageException is thrown.
3, 4, 5, 6 The Storage continues to fail and is restarted.
7 After 3 failures and restarts within 5 seconds the Storage is stopped by its supervisor, i.e. the CounterService.
8 The CounterService is also watching the Storage for termination and receives the Terminated message when the Storage has been stopped …
9, 10, 11 and tells the Counter that there is no Storage.
12 The CounterService schedules a Reconnect message to itself.
13, 14 When it receives the Reconnect message it creates a new Storage
15, 16 and tells the Counter to use the new Storage

Full Source Code of the Fault Tolerance Sample

Scala
sourceimport akka.actor._
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
import akka.util.Timeout
import akka.event.LoggingReceive
import akka.pattern.{ ask, pipe }
import com.typesafe.config.ConfigFactory

/**
 * Runs the sample
 */
object FaultHandlingDocSample extends App {
  import Worker._

  val config = ConfigFactory.parseString("""
    akka.loglevel = "DEBUG"
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
    """)

  val system = ActorSystem("FaultToleranceSample", config)
  val worker = system.actorOf(Props[Worker](), name = "worker")
  val listener = system.actorOf(Props[Listener](), name = "listener")
  // start the work and listen on progress
  // note that the listener is used as sender of the tell,
  // i.e. it will receive replies from the worker
  worker.tell(Start, sender = listener)
}

/**
 * Listens on progress from the worker and shuts down the system when enough
 * work has been done.
 */
class Listener extends Actor with ActorLogging {
  import Worker._
  // If we don't get any progress within 15 seconds then the service is unavailable
  context.setReceiveTimeout(15 seconds)

  def receive = {
    case Progress(percent) =>
      log.info("Current progress: {} %", percent)
      if (percent >= 100.0) {
        log.info("That's all, shutting down")
        context.system.terminate()
      }

    case ReceiveTimeout =>
      // No progress within 15 seconds, ServiceUnavailable
      log.error("Shutting down due to unavailable service")
      context.system.terminate()
  }
}

object Worker {
  case object Start
  case object Do
  final case class Progress(percent: Double)
}

/**
 * Worker performs some work when it receives the `Start` message.
 * It will continuously notify the sender of the `Start` message
 * of current ``Progress``. The `Worker` supervise the `CounterService`.
 */
class Worker extends Actor with ActorLogging {
  import Worker._
  import CounterService._
  implicit val askTimeout: Timeout = Timeout(5 seconds)

  // Stop the CounterService child if it throws ServiceUnavailable
  override val supervisorStrategy = OneForOneStrategy() {
    case _: CounterService.ServiceUnavailable => Stop
  }

  // The sender of the initial Start message will continuously be notified
  // about progress
  var progressListener: Option[ActorRef] = None
  val counterService = context.actorOf(Props[CounterService](), name = "counter")
  val totalCount = 51
  import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext

  def receive = LoggingReceive {
    case Start if progressListener.isEmpty =>
      progressListener = Some(sender())
      context.system.scheduler.scheduleWithFixedDelay(Duration.Zero, 1 second, self, Do)

    case Do =>
      counterService ! Increment(1)
      counterService ! Increment(1)
      counterService ! Increment(1)

      // Send current progress to the initial sender
      (counterService ? GetCurrentCount)
        .map {
          case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
        }
        .pipeTo(progressListener.get)
  }
}

object CounterService {
  final case class Increment(n: Int)
  sealed abstract class GetCurrentCount
  case object GetCurrentCount extends GetCurrentCount
  final case class CurrentCount(key: String, count: Long)
  class ServiceUnavailable(msg: String) extends RuntimeException(msg)

  private case object Reconnect
}

/**
 * Adds the value received in `Increment` message to a persistent
 * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
 * `CounterService` supervise `Storage` and `Counter`.
 */
class CounterService extends Actor {
  import CounterService._
  import Counter._
  import Storage._

  // Restart the storage child when StorageException is thrown.
  // After 3 restarts within 5 seconds it will be stopped.
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
    case _: Storage.StorageException => Restart
  }

  val key = self.path.name
  var storage: Option[ActorRef] = None
  var counter: Option[ActorRef] = None
  var backlog = IndexedSeq.empty[(ActorRef, Any)]
  val MaxBacklog = 10000

  import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext

  override def preStart(): Unit = {
    initStorage()
  }

  /**
   * The child storage is restarted in case of failure, but after 3 restarts,
   * and still failing it will be stopped. Better to back-off than continuously
   * failing. When it has been stopped we will schedule a Reconnect after a delay.
   * Watch the child so we receive Terminated message when it has been terminated.
   */
  def initStorage(): Unit = {
    storage = Some(context.watch(context.actorOf(Props[Storage](), name = "storage")))
    // Tell the counter, if any, to use the new storage
    counter.foreach { _ ! UseStorage(storage) }
    // We need the initial value to be able to operate
    storage.get ! Get(key)
  }

  def receive = LoggingReceive {

    case Entry(k, v) if k == key && counter == None =>
      // Reply from Storage of the initial value, now we can create the Counter
      val c = context.actorOf(Props(classOf[Counter], key, v))
      counter = Some(c)
      // Tell the counter to use current storage
      c ! UseStorage(storage)
      // and send the buffered backlog to the counter
      for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
      backlog = IndexedSeq.empty

    case msg: Increment => forwardOrPlaceInBacklog(msg)

    case msg: GetCurrentCount => forwardOrPlaceInBacklog(msg)

    case Terminated(actorRef) if Some(actorRef) == storage =>
      // After 3 restarts the storage child is stopped.
      // We receive Terminated because we watch the child, see initStorage.
      storage = None
      // Tell the counter that there is no storage for the moment
      counter.foreach { _ ! UseStorage(None) }
      // Try to re-establish storage after while
      context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)

    case Reconnect =>
      // Re-establish storage after the scheduled delay
      initStorage()
  }

  def forwardOrPlaceInBacklog(msg: Any): Unit = {
    // We need the initial value from storage before we can start delegate to
    // the counter. Before that we place the messages in a backlog, to be sent
    // to the counter when it is initialized.
    counter match {
      case Some(c) => c.forward(msg)
      case None =>
        if (backlog.size >= MaxBacklog)
          throw new ServiceUnavailable("CounterService not available, lack of initial value")
        backlog :+= (sender() -> msg)
    }
  }

}

object Counter {
  final case class UseStorage(storage: Option[ActorRef])
}

/**
 * The in memory count variable that will send current
 * value to the `Storage`, if there is any storage
 * available at the moment.
 */
class Counter(key: String, initialValue: Long) extends Actor {
  import Counter._
  import CounterService._
  import Storage._

  var count = initialValue
  var storage: Option[ActorRef] = None

  def receive = LoggingReceive {
    case UseStorage(s) =>
      storage = s
      storeCount()

    case Increment(n) =>
      count += n
      storeCount()

    case GetCurrentCount =>
      sender() ! CurrentCount(key, count)

  }

  def storeCount(): Unit = {
    // Delegate dangerous work, to protect our valuable state.
    // We can continue without storage.
    storage.foreach { _ ! Store(Entry(key, count)) }
  }

}

object Storage {
  final case class Store(entry: Entry)
  final case class Get(key: String)
  final case class Entry(key: String, value: Long)
  class StorageException(msg: String) extends RuntimeException(msg)
}

/**
 * Saves key/value pairs to persistent storage when receiving `Store` message.
 * Replies with current value when receiving `Get` message.
 * Will throw StorageException if the underlying data store is out of order.
 */
class Storage extends Actor {
  import Storage._

  val db = DummyDB

  def receive = LoggingReceive {
    case Store(Entry(key, count)) => db.save(key, count)
    case Get(key)                 => sender() ! Entry(key, db.load(key).getOrElse(0L))
  }
}

object DummyDB {
  import Storage.StorageException
  private var db = Map[String, Long]()

  @throws(classOf[StorageException])
  def save(key: String, value: Long): Unit = synchronized {
    if (11 <= value && value <= 14)
      throw new StorageException("Simulated store failure " + value)
    db += (key -> value)
  }

  @throws(classOf[StorageException])
  def load(key: String): Option[Long] = synchronized {
    db.get(key)
  }
}
Java
sourceimport static akka.actor.SupervisorStrategy.escalate;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.stop;
import static akka.japi.Util.classTag;
import static akka.pattern.Patterns.pipe;
import static jdocs.actor.FaultHandlingDocSample.CounterApi.*;
import static jdocs.actor.FaultHandlingDocSample.CounterServiceApi.*;
import static jdocs.actor.FaultHandlingDocSample.StorageApi.*;
import static jdocs.actor.FaultHandlingDocSample.WorkerApi.*;

import akka.actor.*;
import akka.dispatch.Mapper;
import akka.event.LoggingReceive;
import akka.japi.pf.DeciderBuilder;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;


public class FaultHandlingDocSample {

  /** Runs the sample */
  public static void main(String[] args) {
    Config config =
        ConfigFactory.parseString(
            "akka.loglevel = \"DEBUG\"\n"
                + "akka.actor.debug {\n"
                + "  receive = on\n"
                + "  lifecycle = on\n"
                + "}\n");

    ActorSystem system = ActorSystem.create("FaultToleranceSample", config);
    ActorRef worker = system.actorOf(Props.create(Worker.class), "worker");
    ActorRef listener = system.actorOf(Props.create(Listener.class), "listener");
    // start the work and listen on progress
    // note that the listener is used as sender of the tell,
    // i.e. it will receive replies from the worker
    worker.tell(Start, listener);
  }

  /**
   * Listens on progress from the worker and shuts down the system when enough work has been done.
   */
  public static class Listener extends AbstractLoggingActor {

    @Override
    public void preStart() {
      // If we don't get any progress within 15 seconds then the service
      // is unavailable
      getContext().setReceiveTimeout(Duration.ofSeconds(15));
    }

    @Override
    public Receive createReceive() {
      return LoggingReceive.create(
          receiveBuilder()
              .match(
                  Progress.class,
                  progress -> {
                    log().info("Current progress: {} %", progress.percent);
                    if (progress.percent >= 100.0) {
                      log().info("That's all, shutting down");
                      getContext().getSystem().terminate();
                    }
                  })
              .matchEquals(
                  ReceiveTimeout.getInstance(),
                  x -> {
                    // No progress within 15 seconds, ServiceUnavailable
                    log().error("Shutting down due to unavailable service");
                    getContext().getSystem().terminate();
                  })
              .build(),
          getContext());
    }
  }

  public interface WorkerApi {
    public static final Object Start = "Start";
    public static final Object Do = "Do";

    public static class Progress {
      public final double percent;

      public Progress(double percent) {
        this.percent = percent;
      }

      public String toString() {
        return String.format("%s(%s)", getClass().getSimpleName(), percent);
      }
    }
  }


  /**
   * Worker performs some work when it receives the Start message. It will continuously notify the
   * sender of the Start message of current Progress. The Worker supervise the CounterService.
   */
  public static class Worker extends AbstractLoggingActor {
    final Timeout askTimeout = Timeout.create(Duration.ofSeconds(5));

    // The sender of the initial Start message will continuously be notified
    // about progress
    ActorRef progressListener;
    final ActorRef counterService =
        getContext().actorOf(Props.create(CounterService.class), "counter");
    final int totalCount = 51;

    // Stop the CounterService child if it throws ServiceUnavailable
    private static final SupervisorStrategy strategy =
        new OneForOneStrategy(
            DeciderBuilder.match(ServiceUnavailable.class, e -> stop())
                .matchAny(o -> escalate())
                .build());

    @Override
    public SupervisorStrategy supervisorStrategy() {
      return strategy;
    }

    @Override
    public Receive createReceive() {
      return LoggingReceive.create(
          receiveBuilder()
              .matchEquals(
                  Start,
                  x -> progressListener == null,
                  x -> {
                    progressListener = getSender();
                    getContext()
                        .getSystem()
                        .scheduler()
                        .scheduleWithFixedDelay(
                            Duration.ZERO,
                            Duration.ofSeconds(1L),
                            getSelf(),
                            Do,
                            getContext().getDispatcher(),
                            null);
                  })
              .matchEquals(
                  Do,
                  x -> {
                    counterService.tell(new Increment(1), getSelf());
                    counterService.tell(new Increment(1), getSelf());
                    counterService.tell(new Increment(1), getSelf());
                    // Send current progress to the initial sender
                    pipe(
                            Patterns.ask(counterService, GetCurrentCount, askTimeout)
                                .mapTo(classTag(CurrentCount.class))
                                .map(
                                    new Mapper<CurrentCount, Progress>() {
                                      public Progress apply(CurrentCount c) {
                                        return new Progress(100.0 * c.count / totalCount);
                                      }
                                    },
                                    getContext().dispatcher()),
                            getContext().dispatcher())
                        .to(progressListener);
                  })
              .build(),
          getContext());
    }
  }

  public interface CounterServiceApi {

    public static final Object GetCurrentCount = "GetCurrentCount";

    public static class CurrentCount {
      public final String key;
      public final long count;

      public CurrentCount(String key, long count) {
        this.key = key;
        this.count = count;
      }

      public String toString() {
        return String.format("%s(%s, %s)", getClass().getSimpleName(), key, count);
      }
    }

    public static class Increment {
      public final long n;

      public Increment(long n) {
        this.n = n;
      }

      public String toString() {
        return String.format("%s(%s)", getClass().getSimpleName(), n);
      }
    }

    public static class ServiceUnavailable extends RuntimeException {
      private static final long serialVersionUID = 1L;

      public ServiceUnavailable(String msg) {
        super(msg);
      }
    }
  }


  /**
   * Adds the value received in Increment message to a persistent counter. Replies with CurrentCount
   * when it is asked for CurrentCount. CounterService supervise Storage and Counter.
   */
  public static class CounterService extends AbstractLoggingActor {

    // Reconnect message
    static final Object Reconnect = "Reconnect";

    private static class SenderMsgPair {
      final ActorRef sender;
      final Object msg;

      SenderMsgPair(ActorRef sender, Object msg) {
        this.msg = msg;
        this.sender = sender;
      }
    }

    final String key = getSelf().path().name();
    ActorRef storage;
    ActorRef counter;
    final List<SenderMsgPair> backlog = new ArrayList<>();
    final int MAX_BACKLOG = 10000;

    // Restart the storage child when StorageException is thrown.
    // After 3 restarts within 5 seconds it will be stopped.
    private static final SupervisorStrategy strategy =
        new OneForOneStrategy(
            3,
            Duration.ofSeconds(5),
            DeciderBuilder.match(StorageException.class, e -> restart())
                .matchAny(o -> escalate())
                .build());

    @Override
    public SupervisorStrategy supervisorStrategy() {
      return strategy;
    }

    @Override
    public void preStart() {
      initStorage();
    }

    /**
     * The child storage is restarted in case of failure, but after 3 restarts, and still failing it
     * will be stopped. Better to back-off than continuously failing. When it has been stopped we
     * will schedule a Reconnect after a delay. Watch the child so we receive Terminated message
     * when it has been terminated.
     */
    void initStorage() {
      storage = getContext().watch(getContext().actorOf(Props.create(Storage.class), "storage"));
      // Tell the counter, if any, to use the new storage
      if (counter != null) counter.tell(new UseStorage(storage), getSelf());
      // We need the initial value to be able to operate
      storage.tell(new Get(key), getSelf());
    }

    @Override
    public Receive createReceive() {
      return LoggingReceive.create(
          receiveBuilder()
              .match(
                  Entry.class,
                  entry -> entry.key.equals(key) && counter == null,
                  entry -> {
                    // Reply from Storage of the initial value, now we can create the Counter
                    final long value = entry.value;
                    counter = getContext().actorOf(Props.create(Counter.class, key, value));
                    // Tell the counter to use current storage
                    counter.tell(new UseStorage(storage), getSelf());
                    // and send the buffered backlog to the counter
                    for (SenderMsgPair each : backlog) {
                      counter.tell(each.msg, each.sender);
                    }
                    backlog.clear();
                  })
              .match(
                  Increment.class,
                  increment -> {
                    forwardOrPlaceInBacklog(increment);
                  })
              .matchEquals(
                  GetCurrentCount,
                  gcc -> {
                    forwardOrPlaceInBacklog(gcc);
                  })
              .match(
                  Terminated.class,
                  o -> {
                    // After 3 restarts the storage child is stopped.
                    // We receive Terminated because we watch the child, see initStorage.
                    storage = null;
                    // Tell the counter that there is no storage for the moment
                    counter.tell(new UseStorage(null), getSelf());
                    // Try to re-establish storage after while
                    getContext()
                        .getSystem()
                        .scheduler()
                        .scheduleOnce(
                            Duration.ofSeconds(10),
                            getSelf(),
                            Reconnect,
                            getContext().getDispatcher(),
                            null);
                  })
              .matchEquals(
                  Reconnect,
                  o -> {
                    // Re-establish storage after the scheduled delay
                    initStorage();
                  })
              .build(),
          getContext());
    }

    void forwardOrPlaceInBacklog(Object msg) {
      // We need the initial value from storage before we can start delegate to
      // the counter. Before that we place the messages in a backlog, to be sent
      // to the counter when it is initialized.
      if (counter == null) {
        if (backlog.size() >= MAX_BACKLOG)
          throw new ServiceUnavailable("CounterService not available," + " lack of initial value");
        backlog.add(new SenderMsgPair(getSender(), msg));
      } else {
        counter.forward(msg, getContext());
      }
    }
  }

  public interface CounterApi {
    public static class UseStorage {
      public final ActorRef storage;

      public UseStorage(ActorRef storage) {
        this.storage = storage;
      }

      public String toString() {
        return String.format("%s(%s)", getClass().getSimpleName(), storage);
      }
    }
  }


  /**
   * The in memory count variable that will send current value to the Storage, if there is any
   * storage available at the moment.
   */
  public static class Counter extends AbstractLoggingActor {
    final String key;
    long count;
    ActorRef storage;

    public Counter(String key, long initialValue) {
      this.key = key;
      this.count = initialValue;
    }

    @Override
    public Receive createReceive() {
      return LoggingReceive.create(
          receiveBuilder()
              .match(
                  UseStorage.class,
                  useStorage -> {
                    storage = useStorage.storage;
                    storeCount();
                  })
              .match(
                  Increment.class,
                  increment -> {
                    count += increment.n;
                    storeCount();
                  })
              .matchEquals(
                  GetCurrentCount,
                  gcc -> {
                    getSender().tell(new CurrentCount(key, count), getSelf());
                  })
              .build(),
          getContext());
    }

    void storeCount() {
      // Delegate dangerous work, to protect our valuable state.
      // We can continue without storage.
      if (storage != null) {
        storage.tell(new Store(new Entry(key, count)), getSelf());
      }
    }
  }

  public interface StorageApi {

    public static class Store {
      public final Entry entry;

      public Store(Entry entry) {
        this.entry = entry;
      }

      public String toString() {
        return String.format("%s(%s)", getClass().getSimpleName(), entry);
      }
    }

    public static class Entry {
      public final String key;
      public final long value;

      public Entry(String key, long value) {
        this.key = key;
        this.value = value;
      }

      public String toString() {
        return String.format("%s(%s, %s)", getClass().getSimpleName(), key, value);
      }
    }

    public static class Get {
      public final String key;

      public Get(String key) {
        this.key = key;
      }

      public String toString() {
        return String.format("%s(%s)", getClass().getSimpleName(), key);
      }
    }

    public static class StorageException extends RuntimeException {
      private static final long serialVersionUID = 1L;

      public StorageException(String msg) {
        super(msg);
      }
    }
  }


  /**
   * Saves key/value pairs to persistent storage when receiving Store message. Replies with current
   * value when receiving Get message. Will throw StorageException if the underlying data store is
   * out of order.
   */
  public static class Storage extends AbstractLoggingActor {

    final DummyDB db = DummyDB.instance;

    @Override
    public Receive createReceive() {
      return LoggingReceive.create(
          receiveBuilder()
              .match(
                  Store.class,
                  store -> {
                    db.save(store.entry.key, store.entry.value);
                  })
              .match(
                  Get.class,
                  get -> {
                    Long value = db.load(get.key);
                    getSender()
                        .tell(
                            new Entry(get.key, value == null ? Long.valueOf(0L) : value),
                            getSelf());
                  })
              .build(),
          getContext());
    }
  }

  public static class DummyDB {
    public static final DummyDB instance = new DummyDB();
    private final Map<String, Long> db = new HashMap<String, Long>();

    private DummyDB() {}

    public synchronized void save(String key, Long value) throws StorageException {
      if (11 <= value && value <= 14)
        throw new StorageException("Simulated store failure " + value);
      db.put(key, value);
    }

    public synchronized Long load(String key) throws StorageException {
      return db.get(key);
    }
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.