Loading
Diagrams of the Fault Tolerance Sample

The above diagram illustrates the normal message flow.
Normal flow:
Step | Description |
---|---|
1 | The progress Listener starts the work. |
2 | The Worker schedules work by sending Do messages periodically to itself |
3, 4, 5 | When receiving Do the Worker tells the CounterService
to increment the counter, three times. The Increment message is forwarded
to the Counter , which updates its counter variable and sends current value
to the Storage . |
6, 7 | The Worker asks the CounterService of current value of the counter and pipes
the result back to the Listener . |

The above diagram illustrates what happens in case of storage failure.
Failure flow:
Step | Description |
---|---|
1 | The Storage throws StorageException . |
2 | The CounterService is supervisor of the Storage and restarts the
Storage when StorageException is thrown. |
3, 4, 5, 6 | The Storage continues to fail and is restarted. |
7 | After 3 failures and restarts within 5 seconds the Storage is stopped by its
supervisor, i.e. the CounterService . |
8 | The CounterService is also watching the Storage for termination and
receives the Terminated message when the Storage has been stopped ... |
9, 10, 11 | and tells the Counter that there is no Storage . |
12 | The CounterService schedules a Reconnect message to itself. |
13, 14 | When it receives the Reconnect message it creates a new Storage ... |
15, 16 | and tells the Counter to use the new Storage |
Full Source Code of the Fault Tolerance Sample
- import akka.actor._
- import akka.actor.SupervisorStrategy._
- import scala.concurrent.duration._
- import akka.util.Timeout
- import akka.event.LoggingReceive
- import akka.pattern.{ ask, pipe }
- import com.typesafe.config.ConfigFactory
-
- /**
- * Runs the sample
- */
- object FaultHandlingDocSample extends App {
- import Worker._
-
- val config = ConfigFactory.parseString("""
- akka.loglevel = "DEBUG"
- akka.actor.debug {
- receive = on
- lifecycle = on
- }
- """)
-
- val system = ActorSystem("FaultToleranceSample", config)
- val worker = system.actorOf(Props[Worker], name = "worker")
- val listener = system.actorOf(Props[Listener], name = "listener")
- // start the work and listen on progress
- // note that the listener is used as sender of the tell,
- // i.e. it will receive replies from the worker
- worker.tell(Start, sender = listener)
- }
-
- /**
- * Listens on progress from the worker and shuts down the system when enough
- * work has been done.
- */
- class Listener extends Actor with ActorLogging {
- import Worker._
- // If we don't get any progress within 15 seconds then the service is unavailable
- context.setReceiveTimeout(15 seconds)
-
- def receive = {
- case Progress(percent) =>
- log.info("Current progress: {} %", percent)
- if (percent >= 100.0) {
- log.info("That's all, shutting down")
- context.system.shutdown()
- }
-
- case ReceiveTimeout =>
- // No progress within 15 seconds, ServiceUnavailable
- log.error("Shutting down due to unavailable service")
- context.system.shutdown()
- }
- }
-
- object Worker {
- case object Start
- case object Do
- case class Progress(percent: Double)
- }
-
- /**
- * Worker performs some work when it receives the `Start` message.
- * It will continuously notify the sender of the `Start` message
- * of current ``Progress``. The `Worker` supervise the `CounterService`.
- */
- class Worker extends Actor with ActorLogging {
- import Worker._
- import CounterService._
- implicit val askTimeout = Timeout(5 seconds)
-
- // Stop the CounterService child if it throws ServiceUnavailable
- override val supervisorStrategy = OneForOneStrategy() {
- case _: CounterService.ServiceUnavailable => Stop
- }
-
- // The sender of the initial Start message will continuously be notified
- // about progress
- var progressListener: Option[ActorRef] = None
- val counterService = context.actorOf(Props[CounterService], name = "counter")
- val totalCount = 51
- import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
-
- def receive = LoggingReceive {
- case Start if progressListener.isEmpty =>
- progressListener = Some(sender)
- context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
-
- case Do =>
- counterService ! Increment(1)
- counterService ! Increment(1)
- counterService ! Increment(1)
-
- // Send current progress to the initial sender
- counterService ? GetCurrentCount map {
- case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
- } pipeTo progressListener.get
- }
- }
-
- object CounterService {
- case class Increment(n: Int)
- case object GetCurrentCount
- case class CurrentCount(key: String, count: Long)
- class ServiceUnavailable(msg: String) extends RuntimeException(msg)
-
- private case object Reconnect
- }
-
- /**
- * Adds the value received in `Increment` message to a persistent
- * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
- * `CounterService` supervise `Storage` and `Counter`.
- */
- class CounterService extends Actor {
- import CounterService._
- import Counter._
- import Storage._
-
- // Restart the storage child when StorageException is thrown.
- // After 3 restarts within 5 seconds it will be stopped.
- override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
- withinTimeRange = 5 seconds) {
- case _: Storage.StorageException => Restart
- }
-
- val key = self.path.name
- var storage: Option[ActorRef] = None
- var counter: Option[ActorRef] = None
- var backlog = IndexedSeq.empty[(ActorRef, Any)]
- val MaxBacklog = 10000
-
- import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
-
- override def preStart() {
- initStorage()
- }
-
- /**
- * The child storage is restarted in case of failure, but after 3 restarts,
- * and still failing it will be stopped. Better to back-off than continuously
- * failing. When it has been stopped we will schedule a Reconnect after a delay.
- * Watch the child so we receive Terminated message when it has been terminated.
- */
- def initStorage() {
- storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
- // Tell the counter, if any, to use the new storage
- counter foreach { _ ! UseStorage(storage) }
- // We need the initial value to be able to operate
- storage.get ! Get(key)
- }
-
- def receive = LoggingReceive {
-
- case Entry(k, v) if k == key && counter == None =>
- // Reply from Storage of the initial value, now we can create the Counter
- val c = context.actorOf(Props(classOf[Counter], key, v))
- counter = Some(c)
- // Tell the counter to use current storage
- c ! UseStorage(storage)
- // and send the buffered backlog to the counter
- for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
- backlog = IndexedSeq.empty
-
- case msg @ Increment(n) => forwardOrPlaceInBacklog(msg)
-
- case msg @ GetCurrentCount => forwardOrPlaceInBacklog(msg)
-
- case Terminated(actorRef) if Some(actorRef) == storage =>
- // After 3 restarts the storage child is stopped.
- // We receive Terminated because we watch the child, see initStorage.
- storage = None
- // Tell the counter that there is no storage for the moment
- counter foreach { _ ! UseStorage(None) }
- // Try to re-establish storage after while
- context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
-
- case Reconnect =>
- // Re-establish storage after the scheduled delay
- initStorage()
- }
-
- def forwardOrPlaceInBacklog(msg: Any) {
- // We need the initial value from storage before we can start delegate to
- // the counter. Before that we place the messages in a backlog, to be sent
- // to the counter when it is initialized.
- counter match {
- case Some(c) => c forward msg
- case None =>
- if (backlog.size >= MaxBacklog)
- throw new ServiceUnavailable(
- "CounterService not available, lack of initial value")
- backlog :+= (sender() -> msg)
- }
- }
-
- }
-
- object Counter {
- case class UseStorage(storage: Option[ActorRef])
- }
-
- /**
- * The in memory count variable that will send current
- * value to the `Storage`, if there is any storage
- * available at the moment.
- */
- class Counter(key: String, initialValue: Long) extends Actor {
- import Counter._
- import CounterService._
- import Storage._
-
- var count = initialValue
- var storage: Option[ActorRef] = None
-
- def receive = LoggingReceive {
- case UseStorage(s) =>
- storage = s
- storeCount()
-
- case Increment(n) =>
- count += n
- storeCount()
-
- case GetCurrentCount =>
- sender() ! CurrentCount(key, count)
-
- }
-
- def storeCount() {
- // Delegate dangerous work, to protect our valuable state.
- // We can continue without storage.
- storage foreach { _ ! Store(Entry(key, count)) }
- }
-
- }
-
- object Storage {
- case class Store(entry: Entry)
- case class Get(key: String)
- case class Entry(key: String, value: Long)
- class StorageException(msg: String) extends RuntimeException(msg)
- }
-
- /**
- * Saves key/value pairs to persistent storage when receiving `Store` message.
- * Replies with current value when receiving `Get` message.
- * Will throw StorageException if the underlying data store is out of order.
- */
- class Storage extends Actor {
- import Storage._
-
- val db = DummyDB
-
- def receive = LoggingReceive {
- case Store(Entry(key, count)) => db.save(key, count)
- case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
- }
- }
-
- object DummyDB {
- import Storage.StorageException
- private var db = Map[String, Long]()
-
- @throws(classOf[StorageException])
- def save(key: String, value: Long): Unit = synchronized {
- if (11 <= value && value <= 14)
- throw new StorageException("Simulated store failure " + value)
- db += (key -> value)
- }
-
- @throws(classOf[StorageException])
- def load(key: String): Option[Long] = synchronized {
- db.get(key)
- }
- }