Cluster Singleton

Dependency

To use Cluster Singleton, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.6-SNAPSHOT"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-typed_2.12</artifactId>
  <version>2.6-SNAPSHOT</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.6-SNAPSHOT'
}

Introduction

For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.

Some examples:

  • single point of responsibility for certain cluster-wide consistent decisions, or coordination of actions across the cluster system
  • single entry point to an external system
  • single master, many workers
  • centralized naming service, or routing logic

Using a singleton should not be the first design choice. It has several drawbacks, such as single-point of bottleneck. Single-point of failure is also a relevant concern, but for some cases this feature takes care of that by making sure that another singleton instance will eventually be started.

Example

Any Behavior can be run as a singleton. E.g. a basic counter:

Scala
object Counter {
  trait Command
  case object Increment extends Command
  final case class GetValue(replyTo: ActorRef[Int]) extends Command
  case object GoodByeCounter extends Command

  def apply(): Behavior[Command] = {
    def updated(value: Int): Behavior[Command] = {
      Behaviors.receiveMessage[Command] {
        case Increment =>
          updated(value + 1)
        case GetValue(replyTo) =>
          replyTo ! value
          Behaviors.same
        case GoodByeCounter =>
          // Possible async action then stop
          Behaviors.stopped
      }
    }

    updated(0)
  }
}
Java
public class Counter extends AbstractBehavior<Counter.Command> {

  public interface Command {}

  public enum Increment implements Command {
    INSTANCE
  }

  public static class GetValue implements Command {
    private final ActorRef<Integer> replyTo;

    public GetValue(ActorRef<Integer> replyTo) {
      this.replyTo = replyTo;
    }
  }

  public enum GoodByeCounter implements Command {
    INSTANCE
  }

  public static Behavior<Command> create() {
    return Behaviors.setup(Counter::new);
  }

  private final ActorContext<Command> context;
  private int value = 0;

  private Counter(ActorContext<Command> context) {
    this.context = context;
  }

  @Override
  public Receive<Command> createReceive() {
    return newReceiveBuilder()
        .onMessage(Increment.class, msg -> onIncrement())
        .onMessage(GetValue.class, this::onGetValue)
        .onMessage(GoodByeCounter.class, msg -> onGoodByCounter())
        .build();
  }

  private Behavior<Command> onIncrement() {
    value++;
    return this;
  }

  private Behavior<Command> onGetValue(GetValue msg) {
    msg.replyTo.tell(value);
    return this;
  }

  private Behavior<Command> onGoodByCounter() {
    // Possible async action then stop
    return this;
  }
}

Then on every node in the cluster, or every node with a given role, use the ClusterSingleton extension to spawn the singleton. An instance will per data centre of the cluster:

Scala
import akka.cluster.typed.ClusterSingleton
import akka.cluster.typed.SingletonActor

val singletonManager = ClusterSingleton(system)
// Start if needed and provide a proxy to a named singleton
val proxy: ActorRef[Counter.Command] = singletonManager.init(
  SingletonActor(Behaviors.supervise(Counter()).onFailure[Exception](SupervisorStrategy.restart), "GlobalCounter"))

proxy ! Counter.Increment
Java
import akka.cluster.typed.ClusterSingleton;
import akka.cluster.typed.SingletonActor;

ClusterSingleton singleton = ClusterSingleton.get(system);
// Start if needed and provide a proxy to a named singleton
ActorRef<Counter.Command> proxy =
    singleton.init(SingletonActor.of(Counter.create(), "GlobalCounter"));

proxy.tell(Counter.Increment.INSTANCE);

Supervision

The default supervision strategy when an exception is thrown is for an actor to be stopped. The above example overrides this to restart to ensure it is always running. Another option would be to restart with a backoff:

Scala
val proxyBackOff: ActorRef[Counter.Command] = singletonManager.init(
  SingletonActor(
    Behaviors
      .supervise(Counter())
      .onFailure[Exception](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.2)),
    "GlobalCounter"))
Java
ClusterSingleton singleton = ClusterSingleton.get(system);
ActorRef<Counter.Command> proxy =
    singleton.init(
        SingletonActor.of(
            Behaviors.supervise(Counter.create())
                .onFailure(
                    SupervisorStrategy.restartWithBackoff(
                        Duration.ofSeconds(1), Duration.ofSeconds(10), 0.2)),
            "GlobalCounter"));

Be aware that this means there will be times when the singleton won’t be running as restart is delayed. See Fault Tolerance for a full list of supervision options.

Application specific stop message

An application specific stopMessage can be used to close the resources before actually stopping the singleton actor. This stopMessage is sent to the singleton actor to tell it to finish its work, close resources, and stop. The hand-over to the new oldest node is completed when the singleton actor is terminated. If the shutdown logic does not include any asynchronous actions it can be executed in the PostStop signal handler.

Scala
val singletonActor = SingletonActor(Counter(), "GlobalCounter").withStopMessage(Counter.GoodByeCounter)
singletonManager.init(singletonActor)
Java
SingletonActor<Counter.Command> counterSingleton =
    SingletonActor.of(Counter.create(), "GlobalCounter")
        .withStopMessage(Counter.GoodByeCounter.INSTANCE);
ActorRef<Counter.Command> proxy = singleton.init(counterSingleton);

Accessing singleton of another data centre

TODO

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.