Cluster

Warning

This module is currently marked as may change in the sense of being the subject of active research. This means that API or semantics can change without warning or deprecation period and it is not recommended to use this module in production just yet—you have been warned.

To use the testkit add the following dependency:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.5-SNAPSHOT"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-typed_2.12</artifactId>
  <version>2.5-SNAPSHOT</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.5-SNAPSHOT'
}

For an introduction to Akka Cluster concepts see Cluster Specification. This documentation shows how to use the typed Cluster API. All of the examples below assume the following imports:

Scala
import akka.actor.typed._
import akka.actor.typed.scaladsl._
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import akka.cluster.typed._
Java

import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*;

And the minimum configuration required is to set a host/port for remoting and the cluster

Scala

import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*;
Java

import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*;

Cluster API extension

The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor references, i.e. it’s a message based API.

The references are on the Cluster extension:

Scala
val cluster1 = Cluster(system)
Java
Cluster cluster = Cluster.get(system);

The Cluster extensions gives you access to:

  • manager: An ActorRef[ClusterCommand] where a ClusterCommand is a command such as: Join, Leave and Down
  • subscriptions: An ActorRef[ClusterStateSubscription] where a ClusterStateSubscription is one of GetCurrentState or Subscribe and Unsubscribe to cluster events like MemberRemoved
  • state: The current CurrentClusterState

Cluster Management

If not using configuration to specify seeds joining the cluster can be done programmatically via the manager.

Scala
cluster1.manager ! Join(cluster1.selfMember.address)
Java
cluster.manager().tell(Join.create(cluster.selfMember().address()));

Leaving and downing are similar e.g.

Scala
cluster2.manager ! Leave(cluster2.selfMember.address)
Java
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));

Cluster subscriptions

Cluster subscriptions can be used to receive messages when cluster state changes. For example, registering for all MemberEvents, then using the manager to have a node leave the cluster will result in events for the node going through the lifecycle described in Cluster Specification.

This example subscribes with a TestProbe but in a real application it would be an Actor:

Scala
val probe1 = TestProbe[MemberEvent]()(system1)
cluster1.subscriptions ! Subscribe(probe1.ref, classOf[MemberEvent])
Java
TestProbe<ClusterEvent.MemberEvent> testProbe = new TestProbe<>(system);
cluster.subscriptions().tell(Subscribe.create(testProbe.ref(), ClusterEvent.MemberEvent.class));

Then asking a node to leave:

Scala
cluster1.manager ! Leave(cluster2.selfMember.address)
probe1.within(10.seconds) {
  probe1.expectMsgType[MemberLeft].member.address shouldEqual cluster2.selfMember.address
  probe1.expectMsgType[MemberExited].member.address shouldEqual cluster2.selfMember.address
  probe1.expectMsgType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address
}
Java
cluster.manager().tell(Leave.create(cluster2.selfMember().address()));
testProbe.expectMsgType(ClusterEvent.MemberLeft.class);
testProbe.expectMsgType(ClusterEvent.MemberExited.class);
testProbe.expectMsgType(ClusterEvent.MemberRemoved.class);

Serialization

See serialization for how messages are sent between ActorSystems. Actor references are typically included in the messages, since there is no sender. To serialize actor references to/from string representation you will use the ActorRefResolver. For example here’s how a serializer could look for the Ping and Pong messages above:

Scala
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
  private val actorRefResolver = ActorRefResolver(system.toTyped)

  private val PingManifest = "a"
  private val PongManifest = "b"

  override def identifier = 41

  override def manifest(msg: AnyRef) = msg match {
    case _: Ping ⇒ PingManifest
    case Pong    ⇒ PongManifest
  }

  override def toBinary(msg: AnyRef) = msg match {
    case Ping(who) ⇒
      ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
    case Pong ⇒
      Array.emptyByteArray
  }

  override def fromBinary(bytes: Array[Byte], manifest: String) = {
    manifest match {
      case PingManifest ⇒
        val str = new String(bytes, StandardCharsets.UTF_8)
        val ref = actorRefResolver.resolveActorRef[Pong.type](str)
        Ping(ref)
      case PongManifest ⇒
        Pong
    }
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.