Actor lifecycle
Dependency
To use Akka Actor Typed, you must add the following dependency in your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor-typed_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-actor-typed_2.12:2.5.32" }
Introduction
TODO intro
Creating Actors
An actor can create, or spawn, an arbitrary number of child actors, which in turn can spawn children of their own, thus forming an actor hierarchy. ActorSystem
ActorSystem
hosts the hierarchy and there can be only one root actor, actor at the top of the hierarchy of the ActorSystem
. The lifecycle of a child actor is tied to the parent – a child can stop itself or be stopped at any time but it can never outlive its parent.
The Guardian Actor
The root actor, also called the guardian actor, is created along with the ActorSystem
. Messages sent to the actor system are directed to the root actor. The root actor is defined by the behavior used to create the ActorSystem
, named HelloWorldMain.main
in the example below:
- Scala
-
source
val system: ActorSystem[HelloWorldMain.Start] = ActorSystem(HelloWorldMain.main, "hello") system ! HelloWorldMain.Start("World") system ! HelloWorldMain.Start("Akka") - Java
-
source
final ActorSystem<HelloWorldMain.Start> system = ActorSystem.create(HelloWorldMain.main, "hello"); system.tell(new HelloWorldMain.Start("World")); system.tell(new HelloWorldMain.Start("Akka"));
Spawning Children
Child actors are spawned with akka.actor.typed.scaladsl.ActorContextakka.actor.typed.javadsl.ActorContext’s spawn
. In the example below, when the root actor is started, it spawns a child actor described by the behavior HelloWorld.greeter
. Additionally, when the root actor receives a Start
message, it creates a child actor defined by the behavior HelloWorldBot.bot
:
- Scala
-
source
object HelloWorldMain { final case class Start(name: String) val main: Behavior[Start] = Behaviors.setup { context => val greeter = context.spawn(HelloWorld.greeter, "greeter") Behaviors.receiveMessage { message => val replyTo = context.spawn(HelloWorldBot.bot(greetingCounter = 0, max = 3), message.name) greeter ! HelloWorld.Greet(message.name, replyTo) Behaviors.same } } }
- Java
-
source
public abstract static class HelloWorldMain { private HelloWorldMain() {} public static class Start { public final String name; public Start(String name) { this.name = name; } } public static final Behavior<Start> main = Behaviors.setup( context -> { final ActorRef<HelloWorld.Greet> greeter = context.spawn(HelloWorld.greeter, "greeter"); return Behaviors.receiveMessage( message -> { ActorRef<HelloWorld.Greeted> replyTo = context.spawn(HelloWorldBot.bot(0, 3), message.name); greeter.tell(new HelloWorld.Greet(message.name, replyTo)); return Behaviors.same(); }); }); }
To specify a dispatcher when spawning an actor use DispatcherSelector
DispatcherSelector
. If not specified, the actor will use the default dispatcher, see Default dispatcher for details.
- Scala
-
source
val main: Behavior[Start] = Behaviors.setup { context => val dispatcherPath = "akka.actor.default-blocking-io-dispatcher" val props = DispatcherSelector.fromConfig(dispatcherPath) val greeter = context.spawn(HelloWorld.greeter, "greeter", props) Behaviors.receiveMessage { message => val replyTo = context.spawn(HelloWorldBot.bot(greetingCounter = 0, max = 3), message.name) greeter ! HelloWorld.Greet(message.name, replyTo) Behaviors.same } }
- Java
-
source
public static final Behavior<Start> main = Behaviors.setup( context -> { final String dispatcherPath = "akka.actor.default-blocking-io-dispatcher"; Props props = DispatcherSelector.fromConfig(dispatcherPath); final ActorRef<HelloWorld.Greet> greeter = context.spawn(HelloWorld.greeter, "greeter", props); return Behaviors.receiveMessage( message -> { ActorRef<HelloWorld.Greeted> replyTo = context.spawn(HelloWorldBot.bot(0, 3), message.name); greeter.tell(new HelloWorld.Greet(message.name, replyTo)); return Behaviors.same(); }); });
Refer to Actors for a walk-through of the above examples.
SpawnProtocol
The guardian actor should be responsible for initialization of tasks and create the initial actors of the application, but sometimes you might want to spawn new actors from the outside of the guardian actor. For example creating one actor per HTTP request.
That is not difficult to implement in your behavior, but since this is a common pattern there is a predefined message protocol and implementation of a behavior for this. It can be used as the guardian actor of the ActorSystem
, possibly combined with Behaviors.setup
to start some initial tasks or actors. Child actors can then be started from the outside by telling or asking SpawnProtocol.Spawn
to the actor reference of the system. When using ask
this is similar to how ActorSystem.actorOf
can be used in untyped actors with the difference that a Future
CompletionStage
of the ActorRef
is returned.
The guardian behavior can be defined as:
- Scala
-
source
import akka.actor.typed.Behavior import akka.actor.typed.SpawnProtocol import akka.actor.typed.scaladsl.Behaviors object HelloWorldMain { val main: Behavior[SpawnProtocol] = Behaviors.setup { context => // Start initial tasks // context.spawn(...) SpawnProtocol.behavior } }
- Java
-
source
import akka.actor.typed.Behavior; import akka.actor.typed.SpawnProtocol; import akka.actor.typed.javadsl.Behaviors; public abstract static class HelloWorldMain { private HelloWorldMain() {} public static final Behavior<SpawnProtocol> main = Behaviors.setup( context -> { // Start initial tasks // context.spawn(...) return SpawnProtocol.behavior(); }); }
and the ActorSystem
can be created with that main
behavior and asked to spawn other actors:
- Scala
-
source
import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Props import akka.util.Timeout import akka.actor.Scheduler val system: ActorSystem[SpawnProtocol] = ActorSystem(HelloWorldMain.main, "hello") // needed in implicit scope for ask (?) import akka.actor.typed.scaladsl.AskPattern._ implicit val ec: ExecutionContext = system.executionContext implicit val timeout: Timeout = Timeout(3.seconds) implicit val scheduler: Scheduler = system.scheduler val greeter: Future[ActorRef[HelloWorld.Greet]] = system.ask(SpawnProtocol.Spawn(behavior = HelloWorld.greeter, name = "greeter", props = Props.empty)) val greetedBehavior = Behaviors.receive[HelloWorld.Greeted] { (context, message) => context.log.info("Greeting for {} from {}", message.whom, message.from) Behaviors.stopped } val greetedReplyTo: Future[ActorRef[HelloWorld.Greeted]] = system.ask(SpawnProtocol.Spawn(greetedBehavior, name = "", props = Props.empty)) for (greeterRef <- greeter; replyToRef <- greetedReplyTo) { greeterRef ! HelloWorld.Greet("Akka", replyToRef) }
- Java
-
source
import akka.actor.typed.ActorRef; import akka.actor.typed.ActorSystem; import akka.actor.typed.Props; import akka.actor.typed.javadsl.AskPattern; import akka.util.Timeout; final ActorSystem<SpawnProtocol> system = ActorSystem.create(HelloWorldMain.main, "hello"); final Duration timeout = Duration.ofSeconds(3); CompletionStage<ActorRef<HelloWorld.Greet>> greeter = AskPattern.ask( system, replyTo -> new SpawnProtocol.Spawn<>(HelloWorld.greeter, "greeter", Props.empty(), replyTo), timeout, system.scheduler()); Behavior<HelloWorld.Greeted> greetedBehavior = Behaviors.receive( (context, message) -> { context.getLog().info("Greeting for {} from {}", message.whom, message.from); return Behaviors.stopped(); }); CompletionStage<ActorRef<HelloWorld.Greeted>> greetedReplyTo = AskPattern.ask( system, replyTo -> new SpawnProtocol.Spawn<>(greetedBehavior, "", Props.empty(), replyTo), timeout, system.scheduler()); greeter.whenComplete( (greeterRef, exc) -> { if (exc == null) { greetedReplyTo.whenComplete( (greetedReplyToRef, exc2) -> { if (exc2 == null) { greeterRef.tell(new HelloWorld.Greet("Akka", greetedReplyToRef)); } }); } });
The SpawnProtocol
can also be used at other places in the actor hierarchy. It doesn’t have to be the root guardian actor.
Stopping Actors
An actor can stop itself by returning Behaviors.stopped
as the next behavior.
Child actors can be forced to be stopped after it finishes processing its current message by using the stop
method of the ActorContext
from the parent actor. Only child actors can be stopped in that way.
The child actors will be stopped as part of the shutdown procedure of the parent.
The PostStop
signal that results from stopping an actor can be used for cleaning up resources. Note that a behavior that handles such PostStop
signal can optionally be defined as a parameter to Behaviors.stopped
if different actions is needed when the actor gracefully stops itself from when it is stopped abruptly.
Here is an illustrating example:
- Scala
-
source
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorSystem, Logger, PostStop } import org.scalatest.WordSpecLike import scala.concurrent.Await import scala.concurrent.duration._ object MasterControlProgramActor { sealed trait JobControlLanguage final case class SpawnJob(name: String) extends JobControlLanguage final case object GracefulShutdown extends JobControlLanguage // Predefined cleanup operation def cleanup(log: Logger): Unit = log.info("Cleaning up!") val mcpa = Behaviors .receive[JobControlLanguage] { (context, message) => message match { case SpawnJob(jobName) => context.log.info("Spawning job {}!", jobName) context.spawn(Job.job(jobName), name = jobName) Behaviors.same case GracefulShutdown => context.log.info("Initiating graceful shutdown...") // perform graceful stop, executing cleanup before final system termination // behavior executing cleanup is passed as a parameter to Actor.stopped Behaviors.stopped { () => cleanup(context.system.log) } } } .receiveSignal { case (context, PostStop) => context.log.info("MCPA stopped") Behaviors.same } } object Job { import GracefulStopDocSpec.MasterControlProgramActor.JobControlLanguage def job(name: String) = Behaviors.receiveSignal[JobControlLanguage] { case (context, PostStop) => context.log.info("Worker {} stopped", name) Behaviors.same } } import MasterControlProgramActor._ val system: ActorSystem[JobControlLanguage] = ActorSystem(mcpa, "B7700") system ! SpawnJob("a") system ! SpawnJob("b") Thread.sleep(100) // gracefully stop the system system ! GracefulShutdown Thread.sleep(100) Await.result(system.whenTerminated, 3.seconds)
- Java
-
source
import java.util.concurrent.TimeUnit; import akka.actor.typed.ActorSystem; import akka.actor.typed.Behavior; import akka.actor.typed.PostStop; import akka.actor.typed.javadsl.Behaviors; public abstract static class JobControl { // no instances of this class, it's only a name space for messages // and static methods private JobControl() {} static interface JobControlLanguage {} public static final class SpawnJob implements JobControlLanguage { public final String name; public SpawnJob(String name) { this.name = name; } } public static final class GracefulShutdown implements JobControlLanguage { public GracefulShutdown() {} } public static final Behavior<JobControlLanguage> mcpa = Behaviors.receive(JobControlLanguage.class) .onMessage( SpawnJob.class, (context, message) -> { context.getSystem().log().info("Spawning job {}!", message.name); context.spawn(Job.job(message.name), message.name); return Behaviors.same(); }) .onSignal( PostStop.class, (context, signal) -> { context.getSystem().log().info("Master Control Programme stopped"); return Behaviors.same(); }) .onMessage( GracefulShutdown.class, (context, message) -> { context.getSystem().log().info("Initiating graceful shutdown..."); // perform graceful stop, executing cleanup before final system termination // behavior executing cleanup is passed as a parameter to Actor.stopped return Behaviors.stopped( () -> { context.getSystem().log().info("Cleanup!"); }); }) .onSignal( PostStop.class, (context, signal) -> { context.getSystem().log().info("Master Control Programme stopped"); return Behaviors.same(); }) .build(); } public static class Job { public static Behavior<JobControl.JobControlLanguage> job(String name) { return Behaviors.receiveSignal( (context, PostStop) -> { context.getSystem().log().info("Worker {} stopped", name); return Behaviors.same(); }); } } final ActorSystem<JobControl.JobControlLanguage> system = ActorSystem.create(JobControl.mcpa, "B6700"); system.tell(new JobControl.SpawnJob("a")); system.tell(new JobControl.SpawnJob("b")); // sleep here to allow time for the new actors to be started Thread.sleep(100); system.tell(new JobControl.GracefulShutdown()); system.getWhenTerminated().toCompletableFuture().get(3, TimeUnit.SECONDS);