Diagrams of the Fault Tolerance Sample - Version 2.4.20

Diagrams of the Fault Tolerance Sample

../_images/faulttolerancesample-normal-flow1.png

The above diagram illustrates the normal message flow.

Normal flow:

Step Description
1 The progress Listener starts the work.
2 The Worker schedules work by sending Do messages periodically to itself
3, 4, 5 When receiving Do the Worker tells the CounterService to increment the counter, three times. The Increment message is forwarded to the Counter, which updates its counter variable and sends current value to the Storage.
6, 7 The Worker asks the CounterService of current value of the counter and pipes the result back to the Listener.
../_images/faulttolerancesample-failure-flow1.png

The above diagram illustrates what happens in case of storage failure.

Failure flow:

Step Description
1 The Storage throws StorageException.
2 The CounterService is supervisor of the Storage and restarts the Storage when StorageException is thrown.
3, 4, 5, 6 The Storage continues to fail and is restarted.
7 After 3 failures and restarts within 5 seconds the Storage is stopped by its supervisor, i.e. the CounterService.
8 The CounterService is also watching the Storage for termination and receives the Terminated message when the Storage has been stopped ...
9, 10, 11 and tells the Counter that there is no Storage.
12 The CounterService schedules a Reconnect message to itself.
13, 14 When it receives the Reconnect message it creates a new Storage ...
15, 16 and tells the Counter to use the new Storage

Full Source Code of the Fault Tolerance Sample

  1. import akka.actor._
  2. import akka.actor.SupervisorStrategy._
  3. import scala.concurrent.duration._
  4. import akka.util.Timeout
  5. import akka.event.LoggingReceive
  6. import akka.pattern.{ ask, pipe }
  7. import com.typesafe.config.ConfigFactory
  8.  
  9. /**
  10. * Runs the sample
  11. */
  12. object FaultHandlingDocSample extends App {
  13. import Worker._
  14.  
  15. val config = ConfigFactory.parseString("""
  16. akka.loglevel = "DEBUG"
  17. akka.actor.debug {
  18. receive = on
  19. lifecycle = on
  20. }
  21. """)
  22.  
  23. val system = ActorSystem("FaultToleranceSample", config)
  24. val worker = system.actorOf(Props[Worker], name = "worker")
  25. val listener = system.actorOf(Props[Listener], name = "listener")
  26. // start the work and listen on progress
  27. // note that the listener is used as sender of the tell,
  28. // i.e. it will receive replies from the worker
  29. worker.tell(Start, sender = listener)
  30. }
  31.  
  32. /**
  33. * Listens on progress from the worker and shuts down the system when enough
  34. * work has been done.
  35. */
  36. class Listener extends Actor with ActorLogging {
  37. import Worker._
  38. // If we don't get any progress within 15 seconds then the service is unavailable
  39. context.setReceiveTimeout(15 seconds)
  40.  
  41. def receive = {
  42. case Progress(percent) =>
  43. log.info("Current progress: {} %", percent)
  44. if (percent >= 100.0) {
  45. log.info("That's all, shutting down")
  46. context.system.terminate()
  47. }
  48.  
  49. case ReceiveTimeout =>
  50. // No progress within 15 seconds, ServiceUnavailable
  51. log.error("Shutting down due to unavailable service")
  52. context.system.terminate()
  53. }
  54. }
  55.  
  56. object Worker {
  57. case object Start
  58. case object Do
  59. final case class Progress(percent: Double)
  60. }
  61.  
  62. /**
  63. * Worker performs some work when it receives the `Start` message.
  64. * It will continuously notify the sender of the `Start` message
  65. * of current ``Progress``. The `Worker` supervise the `CounterService`.
  66. */
  67. class Worker extends Actor with ActorLogging {
  68. import Worker._
  69. import CounterService._
  70. implicit val askTimeout = Timeout(5 seconds)
  71.  
  72. // Stop the CounterService child if it throws ServiceUnavailable
  73. override val supervisorStrategy = OneForOneStrategy() {
  74. case _: CounterService.ServiceUnavailable => Stop
  75. }
  76.  
  77. // The sender of the initial Start message will continuously be notified
  78. // about progress
  79. var progressListener: Option[ActorRef] = None
  80. val counterService = context.actorOf(Props[CounterService], name = "counter")
  81. val totalCount = 51
  82. import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
  83.  
  84. def receive = LoggingReceive {
  85. case Start if progressListener.isEmpty =>
  86. progressListener = Some(sender())
  87. context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
  88.  
  89. case Do =>
  90. counterService ! Increment(1)
  91. counterService ! Increment(1)
  92. counterService ! Increment(1)
  93.  
  94. // Send current progress to the initial sender
  95. counterService ? GetCurrentCount map {
  96. case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
  97. } pipeTo progressListener.get
  98. }
  99. }
  100.  
  101. object CounterService {
  102. final case class Increment(n: Int)
  103. sealed abstract class GetCurrentCount
  104. case object GetCurrentCount extends GetCurrentCount
  105. final case class CurrentCount(key: String, count: Long)
  106. class ServiceUnavailable(msg: String) extends RuntimeException(msg)
  107.  
  108. private case object Reconnect
  109. }
  110.  
  111. /**
  112. * Adds the value received in `Increment` message to a persistent
  113. * counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
  114. * `CounterService` supervise `Storage` and `Counter`.
  115. */
  116. class CounterService extends Actor {
  117. import CounterService._
  118. import Counter._
  119. import Storage._
  120.  
  121. // Restart the storage child when StorageException is thrown.
  122. // After 3 restarts within 5 seconds it will be stopped.
  123. override val supervisorStrategy = OneForOneStrategy(
  124. maxNrOfRetries = 3,
  125. withinTimeRange = 5 seconds) {
  126. case _: Storage.StorageException => Restart
  127. }
  128.  
  129. val key = self.path.name
  130. var storage: Option[ActorRef] = None
  131. var counter: Option[ActorRef] = None
  132. var backlog = IndexedSeq.empty[(ActorRef, Any)]
  133. val MaxBacklog = 10000
  134.  
  135. import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
  136.  
  137. override def preStart() {
  138. initStorage()
  139. }
  140.  
  141. /**
  142. * The child storage is restarted in case of failure, but after 3 restarts,
  143. * and still failing it will be stopped. Better to back-off than continuously
  144. * failing. When it has been stopped we will schedule a Reconnect after a delay.
  145. * Watch the child so we receive Terminated message when it has been terminated.
  146. */
  147. def initStorage() {
  148. storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
  149. // Tell the counter, if any, to use the new storage
  150. counter foreach { _ ! UseStorage(storage) }
  151. // We need the initial value to be able to operate
  152. storage.get ! Get(key)
  153. }
  154.  
  155. def receive = LoggingReceive {
  156.  
  157. case Entry(k, v) if k == key && counter == None =>
  158. // Reply from Storage of the initial value, now we can create the Counter
  159. val c = context.actorOf(Props(classOf[Counter], key, v))
  160. counter = Some(c)
  161. // Tell the counter to use current storage
  162. c ! UseStorage(storage)
  163. // and send the buffered backlog to the counter
  164. for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
  165. backlog = IndexedSeq.empty
  166.  
  167. case msg: Increment => forwardOrPlaceInBacklog(msg)
  168.  
  169. case msg: GetCurrentCount => forwardOrPlaceInBacklog(msg)
  170.  
  171. case Terminated(actorRef) if Some(actorRef) == storage =>
  172. // After 3 restarts the storage child is stopped.
  173. // We receive Terminated because we watch the child, see initStorage.
  174. storage = None
  175. // Tell the counter that there is no storage for the moment
  176. counter foreach { _ ! UseStorage(None) }
  177. // Try to re-establish storage after while
  178. context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
  179.  
  180. case Reconnect =>
  181. // Re-establish storage after the scheduled delay
  182. initStorage()
  183. }
  184.  
  185. def forwardOrPlaceInBacklog(msg: Any) {
  186. // We need the initial value from storage before we can start delegate to
  187. // the counter. Before that we place the messages in a backlog, to be sent
  188. // to the counter when it is initialized.
  189. counter match {
  190. case Some(c) => c forward msg
  191. case None =>
  192. if (backlog.size >= MaxBacklog)
  193. throw new ServiceUnavailable(
  194. "CounterService not available, lack of initial value")
  195. backlog :+= (sender() -> msg)
  196. }
  197. }
  198.  
  199. }
  200.  
  201. object Counter {
  202. final case class UseStorage(storage: Option[ActorRef])
  203. }
  204.  
  205. /**
  206. * The in memory count variable that will send current
  207. * value to the `Storage`, if there is any storage
  208. * available at the moment.
  209. */
  210. class Counter(key: String, initialValue: Long) extends Actor {
  211. import Counter._
  212. import CounterService._
  213. import Storage._
  214.  
  215. var count = initialValue
  216. var storage: Option[ActorRef] = None
  217.  
  218. def receive = LoggingReceive {
  219. case UseStorage(s) =>
  220. storage = s
  221. storeCount()
  222.  
  223. case Increment(n) =>
  224. count += n
  225. storeCount()
  226.  
  227. case GetCurrentCount =>
  228. sender() ! CurrentCount(key, count)
  229.  
  230. }
  231.  
  232. def storeCount() {
  233. // Delegate dangerous work, to protect our valuable state.
  234. // We can continue without storage.
  235. storage foreach { _ ! Store(Entry(key, count)) }
  236. }
  237.  
  238. }
  239.  
  240. object Storage {
  241. final case class Store(entry: Entry)
  242. final case class Get(key: String)
  243. final case class Entry(key: String, value: Long)
  244. class StorageException(msg: String) extends RuntimeException(msg)
  245. }
  246.  
  247. /**
  248. * Saves key/value pairs to persistent storage when receiving `Store` message.
  249. * Replies with current value when receiving `Get` message.
  250. * Will throw StorageException if the underlying data store is out of order.
  251. */
  252. class Storage extends Actor {
  253. import Storage._
  254.  
  255. val db = DummyDB
  256.  
  257. def receive = LoggingReceive {
  258. case Store(Entry(key, count)) => db.save(key, count)
  259. case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
  260. }
  261. }
  262.  
  263. object DummyDB {
  264. import Storage.StorageException
  265. private var db = Map[String, Long]()
  266.  
  267. @throws(classOf[StorageException])
  268. def save(key: String, value: Long): Unit = synchronized {
  269. if (11 <= value && value <= 14)
  270. throw new StorageException("Simulated store failure " + value)
  271. db += (key -> value)
  272. }
  273.  
  274. @throws(classOf[StorageException])
  275. def load(key: String): Option[Long] = synchronized {
  276. db.get(key)
  277. }
  278. }