Fault Tolerance (Scala)

Fault Tolerance (Scala)

As explained in Actor Systems each actor is the supervisor of its children, and as such each actor defines fault handling supervisor strategy. This strategy cannot be changed afterwards as it is an integral part of the actor system’s structure.

Fault Handling in Practice

First, let us look at a sample that illustrates one way to handle data store errors, which is a typical source of failure in real world applications. Of course it depends on the actual application what is possible to do when the data store is unavailable, but in this sample we use a best effort re-connect approach.

Read the following source code. The inlined comments explain the different pieces of the fault handling and why they are added. It is also highly recommended to run this sample as it is easy to follow the log output to understand what is happening in runtime.

// imports ...

/**
 * Runs the sample
 */
object FaultHandlingDocSample extends App {
  import Worker._

  val config = ConfigFactory.parseString("""
    akka.loglevel = DEBUG
    akka.actor.debug {
      receive = on
      lifecycle = on
    }
    """)

  val system = ActorSystem("FaultToleranceSample", config)
  val worker = system.actorOf(Props[Worker], name = "worker")
  val listener = system.actorOf(Props[Listener], name = "listener")
  // start the work and listen on progress
  // note that the listener is used as sender of the tell,
  // i.e. it will receive replies from the worker
  worker.tell(Start, sender = listener)
}

/**
 * Listens on progress from the worker and shuts down the system when enough
 * work has been done.
 */
class Listener extends Actor with ActorLogging {
  import Worker._
  // If we don't get any progress within 15 seconds then the service is unavailable
  context.setReceiveTimeout(15 seconds)

  def receive = {
    case Progress(percent) 
      log.info("Current progress: {} %", percent)
      if (percent >= 100.0) {
        log.info("That's all, shutting down")
        context.system.shutdown()
      }

    case ReceiveTimeout 
      // No progress within 15 seconds, ServiceUnavailable
      log.error("Shutting down due to unavailable service")
      context.system.shutdown()
  }
}

// messages ...

/**
 * Worker performs some work when it receives the `Start` message.
 * It will continuously notify the sender of the `Start` message
 * of current ``Progress``. The `Worker` supervise the `CounterService`.
 */
class Worker extends Actor with ActorLogging {
  import Worker._
  import CounterService._
  implicit val askTimeout = Timeout(5 seconds)

  // Stop the CounterService child if it throws ServiceUnavailable
  override val supervisorStrategy = OneForOneStrategy() {
    case _: CounterService.ServiceUnavailable  Stop
  }

  // The sender of the initial Start message will continuously be notified about progress
  var progressListener: Option[ActorRef] = None
  val counterService = context.actorOf(Props[CounterService], name = "counter")
  val totalCount = 51

  def receive = LoggingReceive {
    case Start if progressListener.isEmpty 
      progressListener = Some(sender)
      context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)

    case Do 
      counterService ! Increment(1)
      counterService ! Increment(1)
      counterService ! Increment(1)

      // Send current progress to the initial sender
      counterService ? GetCurrentCount map {
        case CurrentCount(_, count)  Progress(100.0 * count / totalCount)
      } pipeTo progressListener.get
  }
}

// messages ...

/**
 * Adds the value received in `Increment` message to a persistent
 * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
 * `CounterService` supervise `Storage` and `Counter`.
 */
class CounterService extends Actor {
  import CounterService._
  import Counter._
  import Storage._

  // Restart the storage child when StorageException is thrown.
  // After 3 restarts within 5 seconds it will be stopped.
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5 seconds) {
    case _: Storage.StorageException  Restart
  }

  val key = self.path.name
  var storage: Option[ActorRef] = None
  var counter: Option[ActorRef] = None
  var backlog = IndexedSeq.empty[(ActorRef, Any)]
  val MaxBacklog = 10000

  override def preStart() {
    initStorage()
  }

  /**
   * The child storage is restarted in case of failure, but after 3 restarts,
   * and still failing it will be stopped. Better to back-off than continuously
   * failing. When it has been stopped we will schedule a Reconnect after a delay.
   * Watch the child so we receive Terminated message when it has been terminated.
   */
  def initStorage() {
    storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
    // Tell the counter, if any, to use the new storage
    counter foreach { _ ! UseStorage(storage) }
    // We need the initial value to be able to operate
    storage.get ! Get(key)
  }

  def receive = LoggingReceive {

    case Entry(k, v) if k == key && counter == None 
      // Reply from Storage of the initial value, now we can create the Counter
      val c = context.actorOf(Props(new Counter(key, v)))
      counter = Some(c)
      // Tell the counter to use current storage
      c ! UseStorage(storage)
      // and send the buffered backlog to the counter
      for ((replyTo, msg)  backlog) c.tell(msg, sender = replyTo)
      backlog = IndexedSeq.empty

    case msg @ Increment(n)     forwardOrPlaceInBacklog(msg)

    case msg @ GetCurrentCount  forwardOrPlaceInBacklog(msg)

    case Terminated(actorRef) if Some(actorRef) == storage 
      // After 3 restarts the storage child is stopped.
      // We receive Terminated because we watch the child, see initStorage.
      storage = None
      // Tell the counter that there is no storage for the moment
      counter foreach { _ ! UseStorage(None) }
      // Try to re-establish storage after while
      context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)

    case Reconnect 
      // Re-establish storage after the scheduled delay
      initStorage()
  }

  def forwardOrPlaceInBacklog(msg: Any) {
    // We need the initial value from storage before we can start delegate to the counter.
    // Before that we place the messages in a backlog, to be sent to the counter when
    // it is initialized.
    counter match {
      case Some(c)  c forward msg
      case None 
        if (backlog.size >= MaxBacklog)
          throw new ServiceUnavailable("CounterService not available, lack of initial value")
        backlog = backlog :+ (sender, msg)
    }
  }

}

// messages ...

/**
 * The in memory count variable that will send current
 * value to the `Storage`, if there is any storage
 * available at the moment.
 */
class Counter(key: String, initialValue: Long) extends Actor {
  import Counter._
  import CounterService._
  import Storage._

  var count = initialValue
  var storage: Option[ActorRef] = None

  def receive = LoggingReceive {
    case UseStorage(s) 
      storage = s
      storeCount()

    case Increment(n) 
      count += n
      storeCount()

    case GetCurrentCount 
      sender ! CurrentCount(key, count)

  }

  def storeCount() {
    // Delegate dangerous work, to protect our valuable state.
    // We can continue without storage.
    storage foreach { _ ! Store(Entry(key, count)) }
  }

}

// messages ...

/**
 * Saves key/value pairs to persistent storage when receiving `Store` message.
 * Replies with current value when receiving `Get` message.
 * Will throw StorageException if the underlying data store is out of order.
 */
class Storage extends Actor {
  import Storage._

  val db = DummyDB

  def receive = LoggingReceive {
    case Store(Entry(key, count))  db.save(key, count)
    case Get(key)                  sender ! Entry(key, db.load(key).getOrElse(0L))
  }
}

// dummydb ...

Creating a Supervisor Strategy

The following sections explain the fault handling mechanism and alternatives in more depth.

For the sake of demonstration let us consider the following strategy:

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import akka.util.duration._

override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
  case _: ArithmeticException       Resume
  case _: NullPointerException      Restart
  case _: IllegalArgumentException  Stop
  case _: Exception                 Escalate
}

I have chosen a few well-known exception types in order to demonstrate the application of the fault handling directives described in Supervision and Monitoring. First off, it is a one-for-one strategy, meaning that each child is treated separately (an all-for-one strategy works very similarly, the only difference is that any decision is applied to all children of the supervisor, not only the failing one). There are limits set on the restart frequency, namely maximum 10 restarts per minute; each of these settings could be left out, which means that the respective limit does not apply, leaving the possibility to specify an absolute upper limit on the restarts or to make the restarts work infinitely.

The match statement which forms the bulk of the body is of type Decider, which is a PartialFunction[Throwable, Directive]. This is the piece which maps child failure types to their corresponding directives.

Default Supervisor Strategy

Escalate is used if the defined strategy doesn’t cover the exception that was thrown.

When the supervisor strategy is not defined for an actor the following exceptions are handled by default:

  • ActorInitializationException will stop the failing child actor
  • ActorKilledException will stop the failing child actor
  • Exception will restart the failing child actor
  • Other types of Throwable will be escalated to parent actor

If the exception escalate all the way up to the root guardian it will handle it in the same way as the default strategy defined above.

Test Application

The following section shows the effects of the different directives in practice, wherefor a test setup is needed. First off, we need a suitable supervisor:

import akka.actor.Actor

class Supervisor extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import akka.util.duration._

  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException       Resume
    case _: NullPointerException      Restart
    case _: IllegalArgumentException  Stop
    case _: Exception                 Escalate
  }

  def receive = {
    case p: Props  sender ! context.actorOf(p)
  }
}

This supervisor will be used to create a child, with which we can experiment:

import akka.actor.Actor

class Child extends Actor {
  var state = 0
  def receive = {
    case ex: Exception  throw ex
    case x: Int         state = x
    case "get"          sender ! state
  }
}

The test is easier by using the utilities described in Testing Actor Systems (Scala), where AkkaSpec is a convenient mixture of TestKit with WordSpec with MustMatchers

import akka.testkit.{ AkkaSpec, ImplicitSender, EventFilter }
import akka.actor.{ ActorRef, Props, Terminated }

class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {

  "A supervisor" must {

    "apply the chosen strategy for its child" in {
      // code here
    }
  }
}

Let us create actors:

val supervisor = system.actorOf(Props[Supervisor], "supervisor")

supervisor ! Props[Child]
val child = expectMsgType[ActorRef] // retrieve answer from TestKit’s testActor

The first test shall demonstrate the Resume directive, so we try it out by setting some non-initial state in the actor and have it fail:

child ! 42 // set state to 42
child ! "get"
expectMsg(42)

child ! new ArithmeticException // crash it
child ! "get"
expectMsg(42)

As you can see the value 42 survives the fault handling directive. Now, if we change the failure to a more serious NullPointerException, that will no longer be the case:

child ! new NullPointerException // crash it harder
child ! "get"
expectMsg(0)

And finally in case of the fatal IllegalArgumentException the child will be terminated by the supervisor:

watch(child) // have testActor watch “child”
child ! new IllegalArgumentException // break it
expectMsg(Terminated(child))
child.isTerminated must be(true)

Up to now the supervisor was completely unaffected by the child’s failure, because the directives set did handle it. In case of an Exception, this is not true anymore and the supervisor escalates the failure.

supervisor ! Props[Child] // create new child
val child2 = expectMsgType[ActorRef]

watch(child2)
child2 ! "get" // verify it is alive
expectMsg(0)

child2 ! new Exception("CRASH") // escalate failure
expectMsg(Terminated(child2))

The supervisor itself is supervised by the top-level actor provided by the ActorSystem, which has the default policy to restart in case of all Exception cases (with the notable exceptions of ActorInitializationException and ActorKilledException). Since the default directive in case of a restart is to kill all children, we expected our poor child not to survive this failure.

In case this is not desired (which depends on the use case), we need to use a different supervisor which overrides this behavior.

class Supervisor2 extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import akka.util.duration._

  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException       Resume
    case _: NullPointerException      Restart
    case _: IllegalArgumentException  Stop
    case _: Exception                 Escalate
  }

  def receive = {
    case p: Props  sender ! context.actorOf(p)
  }
  // override default to kill all children during restart
  override def preRestart(cause: Throwable, msg: Option[Any]) {}
}

With this parent, the child survives the escalated restart, as demonstrated in the last test:

val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

supervisor2 ! Props[Child]
val child3 = expectMsgType[ActorRef]

child3 ! 23
child3 ! "get"
expectMsg(23)

child3 ! new Exception("CRASH")
child3 ! "get"
expectMsg(0)

Contents