The above diagram illustrates the normal message flow.
Normal flow:
Step
Description
1
The progress Listener starts the work.
2
The Worker schedules work by sending Do messages periodically to itself
3, 4, 5
When receiving Do the Worker tells the CounterService to increment the counter, three times. The Increment message is forwarded to the Counter, which updates its counter variable and sends current value to the Storage.
6, 7
The Worker asks the CounterService of current value of the counter and pipes the result back to the Listener.
The above diagram illustrates what happens in case of storage failure.
Failure flow:
Step
Description
1
The Storage throws StorageException.
2
The CounterService is supervisor of the Storage and restarts the Storage when StorageException is thrown.
3, 4, 5, 6
The Storage continues to fail and is restarted.
7
After 3 failures and restarts within 5 seconds the Storage is stopped by its supervisor, i.e. the CounterService.
8
The CounterService is also watching the Storage for termination and receives the Terminated message when the Storage has been stopped …
9, 10, 11
and tells the Counter that there is no Storage.
12
The CounterService schedules a Reconnect message to itself.
13, 14
When it receives the Reconnect message it creates a new Storage …
sourceimport akka.actor._
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
import akka.util.Timeoutimport akka.event.LoggingReceiveimport akka.pattern.{ ask, pipe }import com.typesafe.config.ConfigFactory/**
* Runs the sample
*/objectFaultHandlingDocSampleextendsApp{importWorker._
val config =ConfigFactory.parseString("""
akka.loglevel = "DEBUG"
akka.actor.debug {
receive = on
lifecycle = on
}
""")
val system =ActorSystem("FaultToleranceSample", config)
val worker = system.actorOf(Props[Worker](), name ="worker")
val listener = system.actorOf(Props[Listener](), name ="listener")// start the work and listen on progress// note that the listener is used as sender of the tell,// i.e. it will receive replies from the worker
worker.tell(Start, sender = listener)}/**
* Listens on progress from the worker and shuts down the system when enough
* work has been done.
*/classListenerextendsActorwithActorLogging{importWorker._
// If we don't get any progress within 15 seconds then the service is unavailable
context.setReceiveTimeout(15 seconds)def receive ={caseProgress(percent)=>
log.info("Current progress: {} %", percent)if(percent >=100.0){
log.info("That's all, shutting down")
context.system.terminate()}caseReceiveTimeout=>// No progress within 15 seconds, ServiceUnavailable
log.error("Shutting down due to unavailable service")
context.system.terminate()}}objectWorker{caseobjectStartcaseobjectDofinalcaseclassProgress(percent:Double)}/**
* Worker performs some work when it receives the `Start` message.
* It will continuously notify the sender of the `Start` message
* of current ``Progress``. The `Worker` supervise the `CounterService`.
*/classWorkerextendsActorwithActorLogging{importWorker._
importCounterService._
implicit val askTimeout:Timeout=Timeout(5 seconds)// Stop the CounterService child if it throws ServiceUnavailableoverride val supervisorStrategy =OneForOneStrategy(){case _:CounterService.ServiceUnavailable=>Stop}// The sender of the initial Start message will continuously be notified// about progressvar progressListener:Option[ActorRef]=None
val counterService = context.actorOf(Props[CounterService](), name ="counter")
val totalCount =51import context.dispatcher // Use this Actors' Dispatcher as ExecutionContextdef receive =LoggingReceive{caseStartif progressListener.isEmpty =>
progressListener =Some(sender())
context.system.scheduler.scheduleWithFixedDelay(Duration.Zero,1 second,self,Do)caseDo=>
counterService !Increment(1)
counterService !Increment(1)
counterService !Increment(1)// Send current progress to the initial sender(counterService ?GetCurrentCount).map {caseCurrentCount(_, count)=>Progress(100.0* count / totalCount)}.pipeTo(progressListener.get)}}objectCounterService{finalcaseclassIncrement(n:Int)sealedabstractclassGetCurrentCountcaseobjectGetCurrentCountextendsGetCurrentCountfinalcaseclassCurrentCount(key:String, count:Long)classServiceUnavailable(msg:String)extendsRuntimeException(msg)privatecaseobjectReconnect}/**
* Adds the value received in `Increment` message to a persistent
* counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
* `CounterService` supervise `Storage` and `Counter`.
*/classCounterServiceextendsActor{importCounterService._
importCounter._
importStorage._
// Restart the storage child when StorageException is thrown.// After 3 restarts within 5 seconds it will be stopped.override val supervisorStrategy =OneForOneStrategy(maxNrOfRetries =3, withinTimeRange =5 seconds){case _:Storage.StorageException=>Restart}
val key =self.path.name
var storage:Option[ActorRef]=Nonevar counter:Option[ActorRef]=Nonevar backlog =IndexedSeq.empty[(ActorRef,Any)]
val MaxBacklog=10000import context.dispatcher // Use this Actors' Dispatcher as ExecutionContextoverridedef preStart():Unit={
initStorage()}/**
* The child storage is restarted in case of failure, but after 3 restarts,
* and still failing it will be stopped. Better to back-off than continuously
* failing. When it has been stopped we will schedule a Reconnect after a delay.
* Watch the child so we receive Terminated message when it has been terminated.
*/def initStorage():Unit={
storage =Some(context.watch(context.actorOf(Props[Storage](), name ="storage")))// Tell the counter, if any, to use the new storage
counter.foreach{ _ !UseStorage(storage)}// We need the initial value to be able to operate
storage.get!Get(key)}def receive =LoggingReceive{caseEntry(k, v)if k == key && counter ==None=>// Reply from Storage of the initial value, now we can create the Counter
val c = context.actorOf(Props(classOf[Counter], key, v))
counter =Some(c)// Tell the counter to use current storage
c !UseStorage(storage)// and send the buffered backlog to the counterfor((replyTo, msg)<- backlog) c.tell(msg, sender = replyTo)
backlog =IndexedSeq.empty
case msg:Increment=> forwardOrPlaceInBacklog(msg)case msg:GetCurrentCount=> forwardOrPlaceInBacklog(msg)caseTerminated(actorRef)ifSome(actorRef)== storage =>// After 3 restarts the storage child is stopped.// We receive Terminated because we watch the child, see initStorage.
storage =None// Tell the counter that there is no storage for the moment
counter.foreach{ _ !UseStorage(None)}// Try to re-establish storage after while
context.system.scheduler.scheduleOnce(10 seconds,self,Reconnect)caseReconnect=>// Re-establish storage after the scheduled delay
initStorage()}def forwardOrPlaceInBacklog(msg:Any):Unit={// We need the initial value from storage before we can start delegate to// the counter. Before that we place the messages in a backlog, to be sent// to the counter when it is initialized.
counter match {caseSome(c)=> c.forward(msg)caseNone=>if(backlog.size >=MaxBacklog)thrownewServiceUnavailable("CounterService not available, lack of initial value")
backlog :+=(sender()-> msg)}}}objectCounter{finalcaseclassUseStorage(storage:Option[ActorRef])}/**
* The in memory count variable that will send current
* value to the `Storage`, if there is any storage
* available at the moment.
*/classCounter(key:String, initialValue:Long)extendsActor{importCounter._
importCounterService._
importStorage._
var count = initialValue
var storage:Option[ActorRef]=Nonedef receive =LoggingReceive{caseUseStorage(s)=>
storage = s
storeCount()caseIncrement(n)=>
count += n
storeCount()caseGetCurrentCount=>
sender()!CurrentCount(key, count)}def storeCount():Unit={// Delegate dangerous work, to protect our valuable state.// We can continue without storage.
storage.foreach{ _ !Store(Entry(key, count))}}}objectStorage{finalcaseclassStore(entry:Entry)finalcaseclassGet(key:String)finalcaseclassEntry(key:String, value:Long)classStorageException(msg:String)extendsRuntimeException(msg)}/**
* Saves key/value pairs to persistent storage when receiving `Store` message.
* Replies with current value when receiving `Get` message.
* Will throw StorageException if the underlying data store is out of order.
*/classStorageextendsActor{importStorage._
val db =DummyDBdef receive =LoggingReceive{caseStore(Entry(key, count))=> db.save(key, count)caseGet(key)=> sender()!Entry(key, db.load(key).getOrElse(0L))}}objectDummyDB{importStorage.StorageExceptionprivatevar db =Map[String,Long]()@throws(classOf[StorageException])def save(key:String, value:Long):Unit=synchronized{if(11<= value && value <=14)thrownewStorageException("Simulated store failure "+ value)
db +=(key -> value)}@throws(classOf[StorageException])def load(key:String):Option[Long]=synchronized{
db.get(key)}}
sourceimport static akka.actor.SupervisorStrategy.escalate;importstatic akka.actor.SupervisorStrategy.restart;importstatic akka.actor.SupervisorStrategy.stop;importstatic akka.japi.Util.classTag;importstatic akka.pattern.Patterns.pipe;importstatic jdocs.actor.FaultHandlingDocSample.CounterApi.*;importstatic jdocs.actor.FaultHandlingDocSample.CounterServiceApi.*;importstatic jdocs.actor.FaultHandlingDocSample.StorageApi.*;importstatic jdocs.actor.FaultHandlingDocSample.WorkerApi.*;import akka.actor.*;import akka.dispatch.Mapper;import akka.event.LoggingReceive;import akka.japi.pf.DeciderBuilder;import akka.pattern.Patterns;import akka.util.Timeout;import com.typesafe.config.Config;import com.typesafe.config.ConfigFactory;import java.time.Duration;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;publicclassFaultHandlingDocSample{/** Runs the sample */publicstaticvoid main(String[] args){Config config =ConfigFactory.parseString("akka.loglevel = \"DEBUG\"\n"+"akka.actor.debug {\n"+" receive = on\n"+" lifecycle = on\n"+"}\n");ActorSystem system =ActorSystem.create("FaultToleranceSample", config);ActorRef worker = system.actorOf(Props.create(Worker.class),"worker");ActorRef listener = system.actorOf(Props.create(Listener.class),"listener");// start the work and listen on progress// note that the listener is used as sender of the tell,// i.e. it will receive replies from the worker
worker.tell(Start, listener);}/**
* Listens on progress from the worker and shuts down the system when enough work has been done.
*/publicstaticclassListenerextendsAbstractLoggingActor{@Overridepublicvoid preStart(){// If we don't get any progress within 15 seconds then the service// is unavailable
getContext().setReceiveTimeout(Duration.ofSeconds(15));}@OverridepublicReceive createReceive(){returnLoggingReceive.create(
receiveBuilder().match(Progress.class,
progress ->{
log().info("Current progress: {} %", progress.percent);if(progress.percent >=100.0){
log().info("That's all, shutting down");
getContext().getSystem().terminate();}}).matchEquals(ReceiveTimeout.getInstance(),
x ->{// No progress within 15 seconds, ServiceUnavailable
log().error("Shutting down due to unavailable service");
getContext().getSystem().terminate();}).build(),
getContext());}}publicinterfaceWorkerApi{publicstaticfinalObjectStart="Start";publicstaticfinalObjectDo="Do";publicstaticclassProgress{publicfinaldouble percent;publicProgress(double percent){this.percent = percent;}publicString toString(){returnString.format("%s(%s)", getClass().getSimpleName(), percent);}}}/**
* Worker performs some work when it receives the Start message. It will continuously notify the
* sender of the Start message of current Progress. The Worker supervise the CounterService.
*/publicstaticclassWorkerextendsAbstractLoggingActor{finalTimeout askTimeout =Timeout.create(Duration.ofSeconds(5));// The sender of the initial Start message will continuously be notified// about progressActorRef progressListener;finalActorRef counterService =
getContext().actorOf(Props.create(CounterService.class),"counter");finalint totalCount =51;// Stop the CounterService child if it throws ServiceUnavailableprivatestaticfinalSupervisorStrategy strategy =newOneForOneStrategy(DeciderBuilder.match(ServiceUnavailable.class, e -> stop()).matchAny(o -> escalate()).build());@OverridepublicSupervisorStrategy supervisorStrategy(){return strategy;}@OverridepublicReceive createReceive(){returnLoggingReceive.create(
receiveBuilder().matchEquals(Start,
x -> progressListener ==null,
x ->{
progressListener = getSender();
getContext().getSystem().scheduler().scheduleWithFixedDelay(Duration.ZERO,Duration.ofSeconds(1L),
getSelf(),Do,
getContext().getDispatcher(),null);}).matchEquals(Do,
x ->{
counterService.tell(newIncrement(1), getSelf());
counterService.tell(newIncrement(1), getSelf());
counterService.tell(newIncrement(1), getSelf());// Send current progress to the initial sender
pipe(Patterns.ask(counterService,GetCurrentCount, askTimeout).mapTo(classTag(CurrentCount.class)).map(newMapper<CurrentCount,Progress>(){publicProgress apply(CurrentCount c){returnnewProgress(100.0* c.count / totalCount);}},
getContext().dispatcher()),
getContext().dispatcher()).to(progressListener);}).build(),
getContext());}}publicinterfaceCounterServiceApi{publicstaticfinalObjectGetCurrentCount="GetCurrentCount";publicstaticclassCurrentCount{publicfinalString key;publicfinallong count;publicCurrentCount(String key,long count){this.key = key;this.count = count;}publicString toString(){returnString.format("%s(%s, %s)", getClass().getSimpleName(), key, count);}}publicstaticclassIncrement{publicfinallong n;publicIncrement(long n){this.n = n;}publicString toString(){returnString.format("%s(%s)", getClass().getSimpleName(), n);}}publicstaticclassServiceUnavailableextendsRuntimeException{privatestaticfinallong serialVersionUID =1L;publicServiceUnavailable(String msg){super(msg);}}}/**
* Adds the value received in Increment message to a persistent counter. Replies with CurrentCount
* when it is asked for CurrentCount. CounterService supervise Storage and Counter.
*/publicstaticclassCounterServiceextendsAbstractLoggingActor{// Reconnect messagestaticfinalObjectReconnect="Reconnect";privatestaticclassSenderMsgPair{finalActorRef sender;finalObject msg;SenderMsgPair(ActorRef sender,Object msg){this.msg = msg;this.sender = sender;}}finalString key = getSelf().path().name();ActorRef storage;ActorRef counter;finalList<SenderMsgPair> backlog =newArrayList<>();finalint MAX_BACKLOG =10000;// Restart the storage child when StorageException is thrown.// After 3 restarts within 5 seconds it will be stopped.privatestaticfinalSupervisorStrategy strategy =newOneForOneStrategy(3,Duration.ofSeconds(5),DeciderBuilder.match(StorageException.class, e -> restart()).matchAny(o -> escalate()).build());@OverridepublicSupervisorStrategy supervisorStrategy(){return strategy;}@Overridepublicvoid preStart(){
initStorage();}/**
* The child storage is restarted in case of failure, but after 3 restarts, and still failing it
* will be stopped. Better to back-off than continuously failing. When it has been stopped we
* will schedule a Reconnect after a delay. Watch the child so we receive Terminated message
* when it has been terminated.
*/void initStorage(){
storage = getContext().watch(getContext().actorOf(Props.create(Storage.class),"storage"));// Tell the counter, if any, to use the new storageif(counter !=null) counter.tell(newUseStorage(storage), getSelf());// We need the initial value to be able to operate
storage.tell(newGet(key), getSelf());}@OverridepublicReceive createReceive(){returnLoggingReceive.create(
receiveBuilder().match(Entry.class,
entry -> entry.key.equals(key)&& counter ==null,
entry ->{// Reply from Storage of the initial value, now we can create the Counterfinallong value = entry.value;
counter = getContext().actorOf(Props.create(Counter.class, key, value));// Tell the counter to use current storage
counter.tell(newUseStorage(storage), getSelf());// and send the buffered backlog to the counterfor(SenderMsgPair each : backlog){
counter.tell(each.msg, each.sender);}
backlog.clear();}).match(Increment.class,
increment ->{
forwardOrPlaceInBacklog(increment);}).matchEquals(GetCurrentCount,
gcc ->{
forwardOrPlaceInBacklog(gcc);}).match(Terminated.class,
o ->{// After 3 restarts the storage child is stopped.// We receive Terminated because we watch the child, see initStorage.
storage =null;// Tell the counter that there is no storage for the moment
counter.tell(newUseStorage(null), getSelf());// Try to re-establish storage after while
getContext().getSystem().scheduler().scheduleOnce(Duration.ofSeconds(10),
getSelf(),Reconnect,
getContext().getDispatcher(),null);}).matchEquals(Reconnect,
o ->{// Re-establish storage after the scheduled delay
initStorage();}).build(),
getContext());}void forwardOrPlaceInBacklog(Object msg){// We need the initial value from storage before we can start delegate to// the counter. Before that we place the messages in a backlog, to be sent// to the counter when it is initialized.if(counter ==null){if(backlog.size()>= MAX_BACKLOG)thrownewServiceUnavailable("CounterService not available,"+" lack of initial value");
backlog.add(newSenderMsgPair(getSender(), msg));}else{
counter.forward(msg, getContext());}}}publicinterfaceCounterApi{publicstaticclassUseStorage{publicfinalActorRef storage;publicUseStorage(ActorRef storage){this.storage = storage;}publicString toString(){returnString.format("%s(%s)", getClass().getSimpleName(), storage);}}}/**
* The in memory count variable that will send current value to the Storage, if there is any
* storage available at the moment.
*/publicstaticclassCounterextendsAbstractLoggingActor{finalString key;long count;ActorRef storage;publicCounter(String key,long initialValue){this.key = key;this.count = initialValue;}@OverridepublicReceive createReceive(){returnLoggingReceive.create(
receiveBuilder().match(UseStorage.class,
useStorage ->{
storage = useStorage.storage;
storeCount();}).match(Increment.class,
increment ->{
count += increment.n;
storeCount();}).matchEquals(GetCurrentCount,
gcc ->{
getSender().tell(newCurrentCount(key, count), getSelf());}).build(),
getContext());}void storeCount(){// Delegate dangerous work, to protect our valuable state.// We can continue without storage.if(storage !=null){
storage.tell(newStore(newEntry(key, count)), getSelf());}}}publicinterfaceStorageApi{publicstaticclassStore{publicfinalEntry entry;publicStore(Entry entry){this.entry = entry;}publicString toString(){returnString.format("%s(%s)", getClass().getSimpleName(), entry);}}publicstaticclassEntry{publicfinalString key;publicfinallong value;publicEntry(String key,long value){this.key = key;this.value = value;}publicString toString(){returnString.format("%s(%s, %s)", getClass().getSimpleName(), key, value);}}publicstaticclassGet{publicfinalString key;publicGet(String key){this.key = key;}publicString toString(){returnString.format("%s(%s)", getClass().getSimpleName(), key);}}publicstaticclassStorageExceptionextendsRuntimeException{privatestaticfinallong serialVersionUID =1L;publicStorageException(String msg){super(msg);}}}/**
* Saves key/value pairs to persistent storage when receiving Store message. Replies with current
* value when receiving Get message. Will throw StorageException if the underlying data store is
* out of order.
*/publicstaticclassStorageextendsAbstractLoggingActor{finalDummyDB db =DummyDB.instance;@OverridepublicReceive createReceive(){returnLoggingReceive.create(
receiveBuilder().match(Store.class,
store ->{
db.save(store.entry.key, store.entry.value);}).match(Get.class,get->{Long value = db.load(get.key);
getSender().tell(newEntry(get.key, value ==null?Long.valueOf(0L): value),
getSelf());}).build(),
getContext());}}publicstaticclassDummyDB{publicstaticfinalDummyDB instance =newDummyDB();privatefinalMap<String,Long> db =newHashMap<String,Long>();privateDummyDB(){}publicsynchronizedvoid save(String key,Long value)throwsStorageException{if(11<= value && value <=14)thrownewStorageException("Simulated store failure "+ value);
db.put(key, value);}publicsynchronizedLong load(String key)throwsStorageException{return db.get(key);}}}
Found an error in this documentation? The source code for this page can be found here.
Please feel free to edit and contribute a pull request.