Actor discovery

Note

For the Akka Classic documentation of this feature see Classic Actors.

Dependency

To use Akka Actor Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-actor-typed" % "2.6-SNAPSHOT"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor-typed_2.12</artifactId>
  <version>2.6-SNAPSHOT</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-actor-typed_2.12', version: '2.6-SNAPSHOT'
}

Obtaining Actor references

There are two general ways to obtain Actor references: by creating actors and by discovery using the Receptionist.

You can pass actor references between actors as constructor parameters or part of messages.

Sometimes you need something to bootstrap the interaction, for example when actors are running on different nodes in the Cluster or when “dependency injection” with constructor parameters is not applicable.

Receptionist

When an actor needs to be discovered by another actor but you are unable to put a reference to it in an incoming message, you can use the Receptionist. You register the specific actors that should be discoverable from other nodes in the local Receptionist instance. The API of the receptionist is also based on actor messages. This registry of actor references is then automatically distributed to all other nodes in the cluster. You can lookup such actors with the key that was used when they were registered. The reply to such a Find request is a Listing, which contains a Set of actor references that are registered for the key. Note that several actors can be registered to the same key.

The registry is dynamic. New actors can be registered during the lifecycle of the system. Entries are removed when registered actors are stopped or a node is removed from the Cluster. To facilitate this dynamic aspect you can also subscribe to changes with the Receptionist.Subscribe message. It will send Listing messages to the subscriber when entries for a key are changed.

These imports are used in the following example:

Scala
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors
Java
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.ServiceKey;

First we create a PingService actor and register it with the Receptionist against a ServiceKey that will later be used to lookup the reference:

Scala
object PingService {
  val PingServiceKey = ServiceKey[Ping]("pingService")

  final case class Ping(replyTo: ActorRef[Pong.type])
  final case object Pong

  def apply(): Behavior[Ping] = {
    Behaviors.setup { context =>
      context.system.receptionist ! Receptionist.Register(PingServiceKey, context.self)

      Behaviors.receiveMessage {
        case Ping(replyTo) =>
          context.log.info("Pinged by {}", replyTo)
          replyTo ! Pong
          Behaviors.same
      }
    }
  }
}
Java
public class PingService {

  public static final ServiceKey<Ping> pingServiceKey =
      ServiceKey.create(Ping.class, "pingService");

  public static class Pong {}

  public static class Ping {
    private final ActorRef<Pong> replyTo;

    public Ping(ActorRef<Pong> replyTo) {
      this.replyTo = replyTo;
    }
  }

  public static Behavior<Ping> create() {
    return Behaviors.setup(
        context -> {
          context
              .getSystem()
              .receptionist()
              .tell(Receptionist.register(pingServiceKey, context.getSelf()));

          return new PingService(context).behavior();
        });
  }

  private final ActorContext<Ping> context;

  private PingService(ActorContext<Ping> context) {
    this.context = context;
  }

  private Behavior<Ping> behavior() {
    return Behaviors.receive(Ping.class).onMessage(Ping.class, this::onPing).build();
  }

  private Behavior<Ping> onPing(Ping msg) {
    context.getLog().info("Pinged by {}", msg.replyTo);
    msg.replyTo.tell(new Pong());
    return Behaviors.same();
  }
}

Then we have another actor that requires a PingService to be constructed:

Scala
object Pinger {
  def apply(pingService: ActorRef[PingService.Ping]): Behavior[PingService.Pong.type] = {
    Behaviors.setup { context =>
      pingService ! PingService.Ping(context.self)

      Behaviors.receiveMessage { _ =>
        context.log.info("{} was ponged!!", context.self)
        Behaviors.stopped
      }
    }
  }
}
Java
public class Pinger {
  private final ActorContext<PingService.Pong> context;
  private final ActorRef<PingService.Ping> pingService;

  private Pinger(ActorContext<PingService.Pong> context, ActorRef<PingService.Ping> pingService) {
    this.context = context;
    this.pingService = pingService;
  }

  public static Behavior<PingService.Pong> create(ActorRef<PingService.Ping> pingService) {
    return Behaviors.setup(
        ctx -> {
          pingService.tell(new PingService.Ping(ctx.getSelf()));
          return new Pinger(ctx, pingService).behavior();
        });
  }

  private Behavior<PingService.Pong> behavior() {
    return Behaviors.receive(PingService.Pong.class)
        .onMessage(PingService.Pong.class, this::onPong)
        .build();
  }

  private Behavior<PingService.Pong> onPong(PingService.Pong msg) {
    context.getLog().info("{} was ponged!!", context.getSelf());
    return Behaviors.stopped();
  }
}

Finally in the guardian actor we spawn the service as well as subscribing to any actors registering against the ServiceKey. Subscribing means that the guardian actor will be informed of any new registrations via a Listing message:

Scala
object Guardian {
  def apply(): Behavior[Nothing] = {
    Behaviors
      .setup[Receptionist.Listing] { context =>
        context.spawnAnonymous(PingService())
        context.system.receptionist ! Receptionist.Subscribe(PingService.PingServiceKey, context.self)

        Behaviors.receiveMessagePartial[Receptionist.Listing] {
          case PingService.PingServiceKey.Listing(listings) =>
            listings.foreach(ps => context.spawnAnonymous(Pinger(ps)))
            Behaviors.same
        }
      }
      .narrow
  }
}
Java
public class Guardian {

  public static Behavior<Void> create() {
    return Behaviors.setup(
            (ActorContext<Receptionist.Listing> context) -> {
              context
                  .getSystem()
                  .receptionist()
                  .tell(
                      Receptionist.subscribe(
                          PingService.pingServiceKey, context.getSelf().narrow()));
              context.spawnAnonymous(PingService.create());

              return new Guardian(context).behavior();
            })
        .unsafeCast(); // Void
  }

  private final ActorContext<Receptionist.Listing> context;

  private Guardian(ActorContext<Receptionist.Listing> context) {
    this.context = context;
  }

  private Behavior<Receptionist.Listing> behavior() {
    return Behaviors.receive(Receptionist.Listing.class)
        .onMessage(Receptionist.Listing.class, this::onListing)
        .build();
  }

  private Behavior<Receptionist.Listing> onListing(Receptionist.Listing msg) {
    msg.getServiceInstances(PingService.pingServiceKey)
        .forEach(pingService -> context.spawnAnonymous(Pinger.create(pingService)));
    return Behaviors.same();
  }
}

Each time a new (which is just a single time in this example) PingService is registered the guardian actor spawns a Pinger for each currently known PingService. The Pinger sends a Ping message and when receiving the Pong reply it stops.

In above example we used Receptionist.Subscribe, but it’s also possible to request a single Listing of the current state without receiving further updates by sending the Receptionist.Find message to the receptionist. An example of using Receptionist.Find:

Scala
object PingManager {
  sealed trait Command
  case object PingAll extends Command
  private case class ListingResponse(listing: Receptionist.Listing) extends Command

  def apply(): Behavior[Command] = {
    Behaviors.setup[Command] { context =>
      val listingResponseAdapter = context.messageAdapter[Receptionist.Listing](ListingResponse)

      context.spawnAnonymous(PingService())

      Behaviors.receiveMessage {
        case PingAll =>
          context.system.receptionist ! Receptionist.Find(PingService.PingServiceKey, listingResponseAdapter)
          Behaviors.same
        case ListingResponse(PingService.PingServiceKey.Listing(listings)) =>
          listings.foreach(ps => context.spawnAnonymous(Pinger(ps)))
          Behaviors.same
      }
    }
  }
}
Java
public class PingManager {

  interface Command {}

  enum PingAll implements Command {
    INSTANCE
  }

  private static class ListingResponse implements Command {
    final Receptionist.Listing listing;

    private ListingResponse(Receptionist.Listing listing) {
      this.listing = listing;
    }
  }

  public static Behavior<Command> create() {
    return Behaviors.setup(context -> new PingManager(context).behavior());
  }

  private final ActorContext<Command> context;
  private final ActorRef<Receptionist.Listing> listingResponseAdapter;

  private PingManager(ActorContext<Command> context) {
    this.context = context;
    this.listingResponseAdapter =
        context.messageAdapter(Receptionist.Listing.class, ListingResponse::new);

    context.spawnAnonymous(PingService.create());
  }

  private Behavior<Command> behavior() {
    return Behaviors.receive(Command.class)
        .onMessage(PingAll.class, notUsed -> onPingAll())
        .onMessage(ListingResponse.class, response -> onListing(response.listing))
        .build();
  }

  private Behavior<Command> onPingAll() {
    context
        .getSystem()
        .receptionist()
        .tell(Receptionist.find(PingService.pingServiceKey, listingResponseAdapter));
    return Behaviors.same();
  }

  private Behavior<Command> onListing(Receptionist.Listing msg) {
    msg.getServiceInstances(PingService.pingServiceKey)
        .forEach(pingService -> context.spawnAnonymous(Pinger.create(pingService)));
    return Behaviors.same();
  }
}

Also note how a messageAdapter is used to convert the Receptionist.Listing to a message type that the PingManager understands.

Cluster Receptionist

The Receptionist also works in a cluster, an actor registered to the receptionist will appear in the receptionist of the other nodes of the cluster.

The state for the receptionist is propagated via distributed data which means that each node will eventually reach the same set of actors per ServiceKey.

Subscriptions and Find queries to a clustered receptionist will keep track of cluster reachability and only list registered actors that are reachable. The full set of actors, including unreachable ones, is available through Listing.allServiceInstancesListing.getAllServiceInstances.

One important difference from local only receptions are the serialization concerns, all messages sent to and back from an actor on another node must be serializable, see serialization.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.