Cluster

Dependency

To use Akka Cluster Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.5.32"
Maven
<dependencies>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-cluster-typed_2.12</artifactId>
    <version>2.5.32</version>
  </dependency>
</dependencies>
Gradle
dependencies {
  implementation "com.typesafe.akka:akka-cluster-typed_2.12:2.5.32"
}

Introduction

For an introduction to Akka Cluster concepts see Cluster Specification. This documentation shows how to use the typed Cluster API.

Note

This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.

Examples

All of the examples below assume the following imports:

Scala
sourceimport akka.actor.typed._
import akka.actor.typed.scaladsl._
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import akka.cluster.typed._
Java
sourceimport akka.actor.typed.*;
import akka.actor.typed.javadsl.*;
import akka.cluster.ClusterEvent;
import akka.cluster.typed.*;

And the minimum configuration required is to set a host/port for remoting and the akka.actor.provider = "cluster".

sourceakka {
  actor {
    provider = "cluster"
  }
  remote {
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]
  }
}

Cluster API extension

The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor references, i.e. it’s a message based API.

The references are on the Cluster extension:

Scala
sourceval cluster1 = Cluster(system)
Java
sourceCluster cluster = Cluster.get(system);

The Cluster extensions gives you access to:

  • manager: An ActorRef[ClusterCommand]ActorRef<ClusterCommand> where a ClusterCommand is a command such as: Join, Leave and Down
  • subscriptions: An ActorRef[ClusterStateSubscription] where a ClusterStateSubscription is one of GetCurrentState or Subscribe and Unsubscribe to cluster events like MemberRemoved
  • state: The current CurrentClusterState

Cluster Management

If not using configuration to specify seeds joining the cluster can be done programmatically via the manager.

Scala
sourcecluster1.manager ! Join(cluster1.selfMember.address)
Java
sourcecluster.manager().tell(Join.create(cluster.selfMember().address()));

Leaving and downing are similar e.g.

Scala
sourcecluster2.manager ! Leave(cluster2.selfMember.address)
Java
sourcecluster2.manager().tell(Leave.create(cluster2.selfMember().address()));

Cluster subscriptions

Cluster subscriptions can be used to receive messages when cluster state changes. For example, registering for all MemberEvents, then using the manager to have a node leave the cluster will result in events for the node going through the lifecycle described in Cluster Specification.

This example subscribes with a TestProbe but in a real application it would be an Actor:

Scala
sourceval probe1 = TestProbe[MemberEvent]()(system1)
cluster1.subscriptions ! Subscribe(probe1.ref, classOf[MemberEvent])
Java
sourceTestProbe<ClusterEvent.MemberEvent> testProbe = TestProbe.create(system);
cluster
    .subscriptions()
    .tell(Subscribe.create(testProbe.ref(), ClusterEvent.MemberEvent.class));

Then asking a node to leave:

Scala
sourcecluster1.manager ! Leave(cluster2.selfMember.address)
probe1.within(10.seconds) {
  probe1.expectMessageType[MemberLeft].member.address shouldEqual cluster2.selfMember.address
  probe1.expectMessageType[MemberExited].member.address shouldEqual cluster2.selfMember.address
  probe1.expectMessageType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address
}
Java
sourcecluster.manager().tell(Leave.create(cluster2.selfMember().address()));
testProbe.expectMessageClass(ClusterEvent.MemberLeft.class);
testProbe.expectMessageClass(ClusterEvent.MemberExited.class);
testProbe.expectMessageClass(ClusterEvent.MemberRemoved.class);

Serialization

See serialization for how messages are sent between ActorSystems. Actor references are typically included in the messages, since there is no sender. To serialize actor references to/from string representation you will use the ActorRefResolver. For example here’s how a serializer could look for the Ping and Pong messages above:

Scala
sourceclass PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
  private val actorRefResolver = ActorRefResolver(system.toTyped)

  private val PingManifest = "a"
  private val PongManifest = "b"

  override def identifier = 41

  override def manifest(msg: AnyRef) = msg match {
    case _: Ping => PingManifest
    case Pong    => PongManifest
  }

  override def toBinary(msg: AnyRef) = msg match {
    case Ping(who) =>
      actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
    case Pong =>
      Array.emptyByteArray
  }

  override def fromBinary(bytes: Array[Byte], manifest: String) = {
    manifest match {
      case PingManifest =>
        val str = new String(bytes, StandardCharsets.UTF_8)
        val ref = actorRefResolver.resolveActorRef[Pong.type](str)
        Ping(ref)
      case PongManifest =>
        Pong
    }
  }
}
Java
sourcepublic class PingSerializer extends SerializerWithStringManifest {

  final ExtendedActorSystem system;
  final ActorRefResolver actorRefResolver;

  static final String PING_MANIFEST = "a";
  static final String PONG_MANIFEST = "b";

  PingSerializer(ExtendedActorSystem system) {
    this.system = system;
    actorRefResolver = ActorRefResolver.get(Adapter.toTyped(system));
  }

  @Override
  public int identifier() {
    return 97876;
  }

  @Override
  public String manifest(Object obj) {
    if (obj instanceof Ping) return PING_MANIFEST;
    else if (obj instanceof Pong) return PONG_MANIFEST;
    else throw new IllegalArgumentException("Unknown type: " + obj);
  }

  @Override
  public byte[] toBinary(Object obj) {
    if (obj instanceof Ping)
      return actorRefResolver
          .toSerializationFormat(((Ping) obj).replyTo)
          .getBytes(StandardCharsets.UTF_8);
    else if (obj instanceof Pong) return new byte[0];
    else throw new IllegalArgumentException("Unknown type: " + obj);
  }

  @Override
  public Object fromBinary(byte[] bytes, String manifest) throws NotSerializableException {
    if (PING_MANIFEST.equals(manifest)) {
      String str = new String(bytes, StandardCharsets.UTF_8);
      ActorRef<Pong> ref = actorRefResolver.resolveActorRef(str);
      return new Ping(ref);
    } else if (PONG_MANIFEST.equals(manifest)) {
      return new Pong();
    } else {
      throw new NotSerializableException("Unable to handle manifest: " + manifest);
    }
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.