Routers

Dependency

To use Akka Actor Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6-SNAPSHOT"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor-typed_2.12</artifactId>
  <version>2.6-SNAPSHOT</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.6-SNAPSHOT'
}

Introduction

In some cases it is useful to distribute messages of the same type over a set of actors, so that messages can be processed in parallel - a single actor will only process one message at a time.

The router itself is a behavior that is spawned into a running actor that will then forward any message sent to it to one final recipient out of the set of routees.

There are two kinds of routers included in Akka Typed - the pool router and the group router.

Pool Router

The pool router is created with a routee Behavior factory and spawns a number of children with that behavior which it will then forward messages to.

If a child is stopped the pool router removes it from its set of routees. When the last child stops the router itself stops. To make a resilient router that deals with failures the routee Behavior must be supervised.

Note that it is important that the Routers.pool factory returns a new behavior instance for every call to the factory or else routees may end up sharing mutable state and not work as expected.

Scala
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Routers
import org.scalatest.WordSpecLike

object Worker {
  sealed trait Command
  case class DoLog(text: String) extends Command

  def apply(): Behavior[Command] = Behaviors.setup { context =>
    context.log.info("Starting worker")

    Behaviors.receiveMessage {
      case DoLog(text) =>
        context.log.info("Got message {}", text)
        Behaviors.same
    }
  }
}

      val pool = Routers.pool(poolSize = 4)(() =>
        // make sure the workers are restarted if they fail
        Behaviors.supervise(Worker()).onFailure[Exception](SupervisorStrategy.restart))
      val router = ctx.spawn(pool, "worker-pool")

      (0 to 10).foreach { n =>
        router ! Worker.DoLog(s"msg $n")
      }
Java
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.GroupRouter;
import akka.actor.typed.javadsl.PoolRouter;
import akka.actor.typed.javadsl.Routers;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;

class Worker {
  interface Command {}

  static class DoLog implements Command {
    public final String text;

    public DoLog(String text) {
      this.text = text;
    }
  }

  static final Behavior<Command> create() {
    return Behaviors.setup(
        context -> {
          context.getLog().info("Starting worker");

          return Behaviors.receive(Command.class)
              .onMessage(DoLog.class, doLog -> onDoLog(context, doLog))
              .build();
        });
  }

  private static Behavior<Command> onDoLog(ActorContext<Command> context, DoLog doLog) {
    context.getLog().info("Got message {}", doLog.text);
    return Behaviors.same();
  }
}

        int poolSize = 4;
        PoolRouter<Worker.Command> pool =
            Routers.pool(
                poolSize,
                () ->
                    // make sure the workers are restarted if they fail
                    Behaviors.supervise(Worker.create()).onFailure(SupervisorStrategy.restart()));
        ActorRef<Worker.Command> router = context.spawn(pool, "worker-pool");

        for (int i = 0; i < 10; i++) {
          router.tell(new Worker.DoLog("msg " + i));
        }

Group Router

The group router is created with a ServiceKey and uses the receptionist (see Receptionist) to discover available actors for that key and routes messages to one of the currently known registered actors for a key.

Since the receptionist is used this means the group router is cluster aware out of the box. The router route messages to registered actors on any node in the cluster that is reachable. If no reachable actor exists the router will fallback and route messages to actors on nodes marked as unreachable.

That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when the group router is started the set of routees it knows about is empty, until it has seen a listing from the receptionist it stashes incoming messages and forwards them as soon as it gets a listing from the receptionist.

When the router has received a listing from the receptionist and the set of registered actors is empty the router will drop them (published them to the event stream as akka.actor.Dropped).

Scala
// this would likely happen elsewhere - if we create it locally we
// can just as well use a pool
val worker = ctx.spawn(Worker(), "worker")
ctx.system.receptionist ! Receptionist.Register(serviceKey, worker)

val group = Routers.group(serviceKey);
val router = ctx.spawn(group, "worker-group");

// the group router will stash messages until it sees the first listing of registered
// services from the receptionist, so it is safe to send messages right away
(0 to 10).foreach { n =>
  router ! Worker.DoLog(s"msg $n")
}
Java
// this would likely happen elsewhere - if we create it locally we
// can just as well use a pool
ActorRef<Worker.Command> worker = context.spawn(Worker.create(), "worker");
context.getSystem().receptionist().tell(Receptionist.register(serviceKey, worker));

GroupRouter<Worker.Command> group = Routers.group(serviceKey);
ActorRef<Worker.Command> router = context.spawn(group, "worker-group");

// the group router will stash messages until it sees the first listing of registered
// services from the receptionist, so it is safe to send messages right away
for (int i = 0; i < 10; i++) {
  router.tell(new Worker.DoLog("msg " + i));
}

Routing strategies

There are two different strategies for selecting what routee a message is forwarded to that can be selected from the router before spawning it:

Scala
val alternativePool = pool.withPoolSize(2).withRoundRobinRouting()
Java
PoolRouter<Worker.Command> alternativePool = pool.withPoolSize(2).withRoundRobinRouting();

Round Robin

Rotates over the set of routees making sure that if there are n routees, then for n messages sent through the router, each actor is forwarded one message.

This is the default for pool routers.

Random

Randomly selects a routee when a message is sent through the router.

This is the default for group routers.

Routers and performance

Note that if the routees are sharing a resource, the resource will determine if increasing the number of actors will actually give higher throughput or faster answers. For example if the routees are CPU bound actors it will not give better performance to create more routees than there are threads to execute the actors.

Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees where it can be processed in parallel (depending on the available threads in the dispatcher). In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.