Cluster Singleton

Dependency

To use Cluster Singleton, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.5.24"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-typed_2.12</artifactId>
  <version>2.5.24</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.5.24'
}

Introduction

Note

This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.

For some use cases it is convenient and sometimes also mandatory to ensure that you have exactly one actor of a certain type running somewhere in the cluster.

Some examples:

  • single point of responsibility for certain cluster-wide consistent decisions, or coordination of actions across the cluster system
  • single entry point to an external system
  • single master, many workers
  • centralized naming service, or routing logic

Using a singleton should not be the first design choice. It has several drawbacks, such as single-point of bottleneck. Single-point of failure is also a relevant concern, but for some cases this feature takes care of that by making sure that another singleton instance will eventually be started.

Example

Any Behavior can be run as a singleton. E.g. a basic counter:

Scala
trait CounterCommand
case object Increment extends CounterCommand
final case class GetValue(replyTo: ActorRef[Int]) extends CounterCommand
case object GoodByeCounter extends CounterCommand

def counter(value: Int): Behavior[CounterCommand] =
  Behaviors.receiveMessage[CounterCommand] {
    case Increment =>
      counter(value + 1)
    case GetValue(replyTo) =>
      replyTo ! value
      Behaviors.same
    case GoodByeCounter =>
      // Do async action then stop
      Behaviors.stopped
  }
Java
interface CounterCommand {}

public static class Increment implements CounterCommand {}

public static class GoodByeCounter implements CounterCommand {}

public static class GetValue implements CounterCommand {
  private final ActorRef<Integer> replyTo;

  public GetValue(ActorRef<Integer> replyTo) {
    this.replyTo = replyTo;
  }
}

public static Behavior<CounterCommand> counter(String entityId, Integer value) {
  return Behaviors.receive(CounterCommand.class)
      .onMessage(Increment.class, (ctx, msg) -> counter(entityId, value + 1))
      .onMessage(
          GetValue.class,
          (ctx, msg) -> {
            msg.replyTo.tell(value);
            return Behaviors.same();
          })
      .onMessage(GoodByeCounter.class, (ctx, msg) -> Behaviors.stopped())
      .build();
}

Then on every node in the cluster, or every node with a given role, use the ClusterSingleton extension to spawn the singleton. An instance will per data centre of the cluster:

Scala
import akka.cluster.typed.ClusterSingleton

val singletonManager = ClusterSingleton(system)
// Start if needed and provide a proxy to a named singleton
val proxy: ActorRef[CounterCommand] = singletonManager.init(
  SingletonActor(Behaviors.supervise(counter(0)).onFailure[Exception](SupervisorStrategy.restart), "GlobalCounter"))

proxy ! Increment
Java
import akka.cluster.typed.*;
import java.time.Duration;
ClusterSingleton singleton = ClusterSingleton.get(system);
// Start if needed and provide a proxy to a named singleton
ActorRef<CounterCommand> proxy =
    singleton.init(SingletonActor.of(counter("TheCounter", 0), "GlobalCounter"));

proxy.tell(new Increment());

Supervision

The default supervision strategy when an exception is thrown is for an actor to be stopped. The above example overrides this to restart to ensure it is always running. Another option would be to restart with a backoff:

Scala
val proxyBackOff: ActorRef[CounterCommand] = singletonManager.init(
  SingletonActor(
    Behaviors
      .supervise(counter(0))
      .onFailure[Exception](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.2)),
    "GlobalCounter"))
Java
ClusterSingleton singleton = ClusterSingleton.get(system);
ActorRef<CounterCommand> proxy =
    singleton.init(
        SingletonActor.of(
            Behaviors.supervise(counter("TheCounter", 0))
                .onFailure(
                    SupervisorStrategy.restartWithBackoff(
                        Duration.ofSeconds(1), Duration.ofSeconds(10), 0.2)),
            "GlobalCounter"));

Be aware that this means there will be times when the singleton won’t be running as restart is delayed. See Fault Tolerance for a full list of supervision options.

Application specific stop message

An application specific stopMessage can be used to close the resources before actually stopping the singleton actor. This stopMessage is sent to the singleton actor to tell it to finish its work, close resources, and stop. The hand-over to the new oldest node is completed when the singleton actor is terminated. If the shutdown logic does not include any asynchronous actions it can be executed in the PostStop signal handler.

Scala
val singletonActor = SingletonActor(counter(0), "GlobalCounter").withStopMessage(GoodByeCounter)
singletonManager.init(singletonActor)
Java
SingletonActor<CounterCommand> counterSingleton =
    SingletonActor.of(counter("TheCounter", 0), "GlobalCounter")
        .withStopMessage(new GoodByeCounter());
ActorRef<CounterCommand> proxy = singleton.init(counterSingleton);

Accessing singleton of another data centre

TODO

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.