Classic Cluster Usage

Note

Akka Classic pertains to the original Actor APIs, which have been improved by more type safe and guided Actor APIs. Akka Classic is still fully supported and existing applications can continue to use the classic APIs. It is also possible to use the new Actor APIs together with classic actors in the same ActorSystem, see coexistence. For new projects we recommend using the new Actor API.

For the full documentation of this feature and for new projects see Cluster. For specific documentation topics see:

Note

You have to enable serialization to send messages between ActorSystems in the Cluster. Serialization with Jackson is a good choice in many cases, and our recommendation if you don’t have other preferences or constraints.

Module info

To use Akka Cluster add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.6.1"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster_2.13</artifactId>
  <version>2.6.1</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster_2.13', version: '2.6.1'
}
Project Info: Akka Cluster (classic)
Artifact
com.typesafe.akka
akka-cluster
2.6.1
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.10, 2.13.1
JPMS module nameakka.cluster
License
Readiness level
Since 2.2.0, 2013-07-09
Home pagehttps://akka.io/
API documentation
Forums
Release notesakka.io blog
IssuesGithub issues
Sourceshttps://github.com/akka/akka

When and where to use Akka Cluster

See Choosing Akka Cluster in the documentation of the new APIs.

Cluster API Extension

The following configuration enables the Cluster extension to be used. It joins the cluster and an actor subscribes to cluster membership events and logs them.

An actor that uses the cluster extension may look like this:

Scala
/*
 * Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
 */

package scala.docs.cluster

import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorLogging
import akka.actor.Actor

class SimpleClusterListener extends Actor with ActorLogging {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, re-subscribe when restart
  override def preStart(): Unit = {
    //#subscribe
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
    //#subscribe
  }
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
    case MemberRemoved(member, previousStatus) =>
      log.info("Member is Removed: {} after {}", member.address, previousStatus)
    case _: MemberEvent => // ignore
  }
}
Java
/*
 * Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
 */

package jdocs.cluster;

import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.ClusterEvent.MemberRemoved;
import akka.cluster.ClusterEvent.UnreachableMember;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class SimpleClusterListener extends AbstractActor {
  LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
  Cluster cluster = Cluster.get(getContext().getSystem());

  // subscribe to cluster changes
  @Override
  public void preStart() {
    // #subscribe
    cluster.subscribe(
        getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
    // #subscribe
  }

  // re-subscribe when restart
  @Override
  public void postStop() {
    cluster.unsubscribe(getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            MemberUp.class,
            mUp -> {
              log.info("Member is Up: {}", mUp.member());
            })
        .match(
            UnreachableMember.class,
            mUnreachable -> {
              log.info("Member detected as unreachable: {}", mUnreachable.member());
            })
        .match(
            MemberRemoved.class,
            mRemoved -> {
              log.info("Member is Removed: {}", mRemoved.member());
            })
        .match(
            MemberEvent.class,
            message -> {
              // ignore
            })
        .build();
  }
}

And the minimum configuration required is to set a host/port for remoting and the akka.actor.provider = "cluster".

akka {
  actor {
    provider = "cluster"
  }
  remote.artery {
    canonical {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka://[email protected]:2551",
      "akka://[email protected]:2552"]
  }
}

The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.

Cluster Membership API

This section shows the basic usage of the membership API. For the in-depth descriptions on joining, joining to seed nodes, downing and leaving of any node in the cluster please see the full Cluster Membership API documentation.

Joining to Seed Nodes

The seed nodes are initial contact points for joining a cluster, which can be done in different ways:

After the joining process the seed nodes are not special and they participate in the cluster in exactly the same way as other nodes.

Joining programmatically to seed nodes

You may also join programmatically, which is attractive when dynamically discovering other nodes at startup by using some external tool or API.

Scala
import akka.actor.Address
import akka.cluster.Cluster

val cluster = Cluster(system)
val list: List[Address] = ??? //your method to dynamically get seed nodes
cluster.joinSeedNodes(list)
Java
import akka.actor.Address;
import akka.cluster.Cluster;

final Cluster cluster = Cluster.get(system);
List<Address> list =
    new LinkedList<>(); // replace this with your method to dynamically get seed nodes
cluster.joinSeedNodes(list);

For more information see tuning joins

It’s also possible to specifically join a single node as illustrated in below example, but joinSeedNodes should be preferred since it has redundancy and retry mechanisms built-in.

Scala
val cluster = Cluster(context.system)
  cluster.join(cluster.selfAddress)
Java
Cluster cluster = Cluster.get(getContext().getSystem());
  cluster.join(cluster.selfAddress());

Leaving

See Leaving in the documentation of the new APIs.

Downing

See Downing in the documentation of the new APIs.

Subscribe to Cluster Events

You can subscribe to change notifications of the cluster membership by using Cluster(system).subscribeCluster.get(system).subscribe.

Scala
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
Java
cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);

A snapshot of the full state, akka.cluster.ClusterEvent.CurrentClusterState, is sent to the subscriber as the first message, followed by events for incremental updates.

Note that you may receive an empty CurrentClusterState, containing no members, followed by MemberUp events from other nodes which already joined, if you start the subscription before the initial join procedure has completed. This may for example happen when you start the subscription immediately after cluster.join() like below. This is expected behavior. When the node has been accepted in the cluster you will receive MemberUp for that node, and other nodes.

Scala
val cluster = Cluster(context.system)
  cluster.join(cluster.selfAddress)
cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
Java
Cluster cluster = Cluster.get(getContext().getSystem());
  cluster.join(cluster.selfAddress());
cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class);

To avoid receiving an empty CurrentClusterState at the beginning, you can use it like shown in the following example, to defer subscription until the MemberUp event for the own node is received:

Scala
val cluster = Cluster(context.system)
  cluster.join(cluster.selfAddress)
cluster.registerOnMemberUp {
  cluster.subscribe(self, classOf[MemberEvent], classOf[UnreachableMember])
}
Java
Cluster cluster = Cluster.get(getContext().getSystem());
  cluster.join(cluster.selfAddress());
cluster.registerOnMemberUp(
    () -> cluster.subscribe(getSelf(), MemberEvent.class, UnreachableMember.class));

If you find it inconvenient to handle the CurrentClusterState you can use ClusterEvent.InitialStateAsEvents ClusterEvent.initialStateAsEvents() as parameter to subscribe. That means that instead of receiving CurrentClusterState as the first message you will receive the events corresponding to the current state to mimic what you would have seen if you were listening to the events when they occurred in the past. Note that those initial events only correspond to the current state and it is not the full history of all changes that actually has occurred in the cluster.

Scala
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
Java
cluster.subscribe(
    getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);

Worker Dial-in Example

Let’s take a look at an example that illustrates how workers, here named backend, can detect and register to new master nodes, here named frontend.

The example application provides a service to transform text. When some text is sent to one of the frontend services, it will be delegated to one of the backend workers, which performs the transformation job, and sends the result back to the original client. New backend nodes, as well as new frontend nodes, can be added or removed to the cluster dynamically.

Messages:

Scala
final case class TransformationJob(text: String)
final case class TransformationResult(text: String)
final case class JobFailed(reason: String, job: TransformationJob)
case object BackendRegistration
Java
public interface TransformationMessages {

  public static class TransformationJob implements Serializable {
    private final String text;

    public TransformationJob(String text) {
      this.text = text;
    }

    public String getText() {
      return text;
    }
  }

  public static class TransformationResult implements Serializable {
    private final String text;

    public TransformationResult(String text) {
      this.text = text;
    }

    public String getText() {
      return text;
    }

    @Override
    public String toString() {
      return "TransformationResult(" + text + ")";
    }
  }

  public static class JobFailed implements Serializable {
    private final String reason;
    private final TransformationJob job;

    public JobFailed(String reason, TransformationJob job) {
      this.reason = reason;
      this.job = job;
    }

    public String getReason() {
      return reason;
    }

    public TransformationJob getJob() {
      return job;
    }

    @Override
    public String toString() {
      return "JobFailed(" + reason + ")";
    }
  }

  public static final String BACKEND_REGISTRATION = "BackendRegistration";
}

The backend worker that performs the transformation job:

Scala
class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up).foreach(register)
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") !
      BackendRegistration
}
Java
public class TransformationBackend extends AbstractActor {

  Cluster cluster = Cluster.get(getContext().getSystem());

  // subscribe to cluster changes, MemberUp
  @Override
  public void preStart() {
    cluster.subscribe(getSelf(), MemberUp.class);
  }

  // re-subscribe when restart
  @Override
  public void postStop() {
    cluster.unsubscribe(getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            TransformationJob.class,
            job -> {
              getSender().tell(new TransformationResult(job.getText().toUpperCase()), getSelf());
            })
        .match(
            CurrentClusterState.class,
            state -> {
              for (Member member : state.getMembers()) {
                if (member.status().equals(MemberStatus.up())) {
                  register(member);
                }
              }
            })
        .match(
            MemberUp.class,
            mUp -> {
              register(mUp.member());
            })
        .build();
  }

  void register(Member member) {
    if (member.hasRole("frontend"))
      getContext()
          .actorSelection(member.address() + "/user/frontend")
          .tell(BACKEND_REGISTRATION, getSelf());
  }
}

Note that the TransformationBackend actor subscribes to cluster events to detect new, potential, frontend nodes, and send them a registration message so that they know that they can use the backend worker.

The frontend that receives user jobs and delegates to one of the registered backend workers:

Scala
class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size).forward(job)

    case BackendRegistration if !backends.contains(sender()) =>
      context.watch(sender())
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}
Java
public class TransformationFrontend extends AbstractActor {

  List<ActorRef> backends = new ArrayList<ActorRef>();
  int jobCounter = 0;

  @Override
  public Receive createReceive() {
    return receiveBuilder()
        .match(
            TransformationJob.class,
            job -> backends.isEmpty(),
            job -> {
              getSender()
                  .tell(new JobFailed("Service unavailable, try again later", job), getSender());
            })
        .match(
            TransformationJob.class,
            job -> {
              jobCounter++;
              backends.get(jobCounter % backends.size()).forward(job, getContext());
            })
        .matchEquals(
            BACKEND_REGISTRATION,
            x -> {
              getContext().watch(getSender());
              backends.add(getSender());
            })
        .match(
            Terminated.class,
            terminated -> {
              backends.remove(terminated.getActor());
            })
        .build();
  }
}

Note that the TransformationFrontend actor watch the registered backend to be able to remove it from its list of available backend workers. Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched actor. Death watch generates the Terminated message to the watching actor when the unreachable cluster node has been downed and removed.

Node Roles

See Cluster Node Roles in the documentation of the new APIs.

How To Startup when Cluster Size Reached

See How to startup when a minimum number of members in the cluster is reached in the documentation of the new APIs.

How To Startup when Member is Up

A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.

With a configuration option you can define required number of members before the leader changes member status of ‘Joining’ members to ‘Up’.:

akka.cluster.min-nr-of-members = 3

In a similar way you can define required number of members of a certain role before the leader changes member status of ‘Joining’ members to ‘Up’.:

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

You can start actors or trigger any functions using the registerOnMemberUp callback, which will be invoked when the current member status is changed to ‘Up’. This can additionally be used with akka.cluster.min-nr-of-members optional configuration to defer an action until the cluster has reached a certain size.

Scala
Cluster(system).registerOnMemberUp {
  system.actorOf(Props(classOf[FactorialFrontend], upToN, true), name = "factorialFrontend")
}
Java
Cluster.get(system)
    .registerOnMemberUp(
        new Runnable() {
          @Override
          public void run() {
            system.actorOf(
                Props.create(FactorialFrontend.class, upToN, true), "factorialFrontend");
          }
        });

How To Cleanup when Member is Removed

You can do some clean up in a registerOnMemberRemoved callback, which will be invoked when the current member status is changed to ‘Removed’ or the cluster have been shutdown.

An alternative is to register tasks to the Coordinated Shutdown.

Note

Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on the caller thread, otherwise it will be invoked later when the current member status changed to ‘Removed’. You may want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting down when you installing, and depending on the race is not healthy.

Higher level Cluster tools

Cluster Singleton

For some use cases it is convenient or necessary to ensure only one actor of a certain type is running somewhere in the cluster. This can be implemented by subscribing to member events, but there are several corner cases to consider. Therefore, this specific use case is covered by the Cluster Singleton.

See Cluster Singleton.

Cluster Sharding

Distributes actors across several nodes in the cluster and supports interaction with the actors using their logical identifier, but without having to care about their physical location in the cluster.

See Cluster Sharding.

Distributed Data

Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API.

See Distributed Data.

Distributed Publish Subscribe

Publish-subscribe messaging between actors in the cluster, and point-to-point messaging using the logical path of the actors, i.e. the sender does not have to know on which node the destination actor is running.

See Cluster Distributed Publish Subscribe.

Cluster Aware Routers

All routers can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. When a node becomes unreachable or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster, additional routees are added to the router, according to the configuration.

See Cluster Aware Routers and Routers.

Cluster across multiple data centers

Akka Cluster can be used across multiple data centers, availability zones or regions, so that one Cluster can span multiple data centers and still be tolerant to network partitions.

See Cluster Multi-DC.

Cluster Client

Communication from an actor system that is not part of the cluster to actors running somewhere in the cluster. The client does not have to know on which node the destination actor is running.

See Cluster Client.

Cluster Metrics

The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus.

See Cluster Metrics.

Failure Detector

The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. Please see:

How to Test

Multi Node Testing is useful for testing cluster applications.

Set up your project according to the instructions in Multi Node Testing and Multi JVM Testing, i.e. add the sbt-multi-jvm plugin and the dependency to akka-multi-node-testkit.

First, as described in Multi Node Testing, we need some scaffolding to configure the MultiNodeSpec. Define the participating roles and their configuration in an object extending MultiNodeConfig:

import akka.remote.testkit.MultiNodeConfig
import com.typesafe.config.ConfigFactory

object StatsSampleSpecConfig extends MultiNodeConfig {
  // register the named roles (nodes) of the test
  val first = role("first")
  val second = role("second")
  val third = role("third")

  def nodeList = Seq(first, second, third)

  // Extract individual sigar library for every node.
  nodeList.foreach { role =>
    nodeConfig(role) {
      ConfigFactory.parseString(s"""
      # Enable metrics extension in akka-cluster-metrics.
      akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
      # Sigar native library extract location during tests.
      akka.cluster.metrics.native-library-extract-folder=target/native/${role.name}
      """)
    }
  }

  // this configuration will be used for all nodes
  // note that no fixed host names and ports are used
  commonConfig(ConfigFactory.parseString("""
    akka.actor.provider = cluster
    akka.remote.classic.log-remote-lifecycle-events = off
    akka.cluster.roles = [compute]
    akka.actor.deployment {
      /statsService/workerRouter {
          router = consistent-hashing-group
          routees.paths = ["/user/statsWorker"]
          cluster {
            enabled = on
            allow-local-routees = on
            use-roles = ["compute"]
          }
        }
    }
    """))

}

Define one concrete test class for each role/node. These will be instantiated on the different nodes (JVMs). They can be implemented differently, but often they are the same and extend an abstract test class, as illustrated here.

// need one concrete test class per node
class StatsSampleSpecMultiJvmNode1 extends StatsSampleSpec
class StatsSampleSpecMultiJvmNode2 extends StatsSampleSpec
class StatsSampleSpecMultiJvmNode3 extends StatsSampleSpec

Note the naming convention of these classes. The name of the classes must end with MultiJvmNode1, MultiJvmNode2 and so on. It is possible to define another suffix to be used by the sbt-multi-jvm, but the default should be fine in most cases.

Then the abstract MultiNodeSpec, which takes the MultiNodeConfig as constructor parameter.

import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike }

abstract class StatsSampleSpec
    extends MultiNodeSpec(StatsSampleSpecConfig)
    with WordSpecLike
    with Matchers
    with BeforeAndAfterAll
    with ImplicitSender {

  import StatsSampleSpecConfig._

  override def initialParticipants = roles.size

  override def beforeAll() = multiNodeSpecBeforeAll()

  override def afterAll() = multiNodeSpecAfterAll()

Most of this can be extracted to a separate trait to avoid repeating this in all your tests.

Typically you begin your test by starting up the cluster and let the members join, and create some actors. That can be done like this:

"illustrate how to startup cluster" in within(15 seconds) {
  Cluster(system).subscribe(testActor, classOf[MemberUp])
  expectMsgClass(classOf[CurrentClusterState])

  val firstAddress = node(first).address
  val secondAddress = node(second).address
  val thirdAddress = node(third).address

  Cluster(system).join(firstAddress)

  system.actorOf(Props[StatsWorker], "statsWorker")
  system.actorOf(Props[StatsService], "statsService")

  receiveN(3).collect { case MemberUp(m) => m.address }.toSet should be(
    Set(firstAddress, secondAddress, thirdAddress))

  Cluster(system).unsubscribe(testActor)

  testConductor.enter("all-up")
}

From the test you interact with the cluster using the Cluster extension, e.g. join.

Cluster(system).join(firstAddress)

Notice how the testActor from testkit is added as subscriber to cluster changes and then waiting for certain events, such as in this case all members becoming ‘Up’.

The above code was running for all roles (JVMs). runOn is a convenient utility to declare that a certain block of code should only run for a specific role.

"show usage of the statsService from one node" in within(15 seconds) {
  runOn(second) {
    assertServiceOk()
  }

  testConductor.enter("done-2")
}

def assertServiceOk(): Unit = {
  val service = system.actorSelection(node(third) / "user" / "statsService")
  // eventually the service should be ok,
  // first attempts might fail because worker actors not started yet
  awaitAssert {
    service ! StatsJob("this is the text that will be analyzed")
    expectMsgType[StatsResult](1.second).meanWordLength should be(3.875 +- 0.001)
  }

}

Once again we take advantage of the facilities in testkit to verify expected behavior. Here using testActor as sender (via ImplicitSender) and verifying the reply with expectMsgPF.

In the above code you can see node(third), which is useful facility to get the root actor reference of the actor system for a specific role. This can also be used to grab the akka.actor.Address of that node.

val firstAddress = node(first).address
val secondAddress = node(second).address
val thirdAddress = node(third).address

Currently testing with the sbt-multi-jvm plugin is only documented for Scala. Go to the corresponding Scala version of this page for details.

Management

There are several management tools for the cluster. Please refer to the Cluster Management for more information.

Command Line

Warning

Deprecation warning - The command line script has been deprecated and is scheduled for removal in the next major version. Use the HTTP management API with curl or similar instead.

The cluster can be managed with the script akka-cluster provided in the Akka GitHub repository here. Place the script and the jmxsh-R5.jar library in the same directory.

Run it without parameters to see instructions about how to use the script:

Usage: ./akka-cluster <node-hostname> <jmx-port> <command> ...

Supported commands are:
           join <node-url> - Sends request a JOIN node with the specified URL
          leave <node-url> - Sends a request for node with URL to LEAVE the cluster
           down <node-url> - Sends a request for marking node with URL as DOWN
             member-status - Asks the member node for its current status
                   members - Asks the cluster for addresses of current members
               unreachable - Asks the cluster for addresses of unreachable members
            cluster-status - Asks the cluster for its current status (member ring,
                             unavailable nodes, meta data etc.)
                    leader - Asks the cluster who the current leader is
              is-singleton - Checks if the cluster is a singleton cluster (single
                             node cluster)
              is-available - Checks if the member node is available
Where the <node-url> should be on the format of
  'akka.<protocol>://<actor-system-name>@<hostname>:<port>'

Examples: ./akka-cluster localhost 9999 is-available
          ./akka-cluster localhost 9999 join akka://[email protected]:2552
          ./akka-cluster localhost 9999 cluster-status

To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, as described in Monitoring and Management Using JMX Technology. Make sure you understand the security implications of enabling remote monitoring and management.

Configuration

There are several configuration properties for the cluster, and the full reference configuration for complete information.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.