Cluster
Dependency
To use Akka Cluster Typed, you must add the following dependency in your project:
- sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.5.32"
- Maven
<dependencies> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_2.12</artifactId> <version>2.5.32</version> </dependency> </dependencies>
- Gradle
dependencies { implementation "com.typesafe.akka:akka-cluster-typed_2.12:2.5.32" }
Introduction
For an introduction to Akka Cluster concepts see Cluster Specification. This documentation shows how to use the typed Cluster API.
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Examples
All of the examples below assume the following imports:
- Scala
-
source
import akka.actor.typed._ import akka.actor.typed.scaladsl._ import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus import akka.cluster.typed._
- Java
-
source
import akka.actor.typed.*; import akka.actor.typed.javadsl.*; import akka.cluster.ClusterEvent; import akka.cluster.typed.*;
And the minimum configuration required is to set a host/port for remoting and the akka.actor.provider = "cluster"
.
sourceakka {
actor {
provider = "cluster"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"]
}
}
Cluster API extension
The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor references, i.e. it’s a message based API.
The references are on the Cluster
extension:
The Cluster extensions gives you access to:
- manager: An
ActorRef[ClusterCommand]
ActorRef<ClusterCommand>
where aClusterCommand
is a command such as:Join
,Leave
andDown
- subscriptions: An
ActorRef[ClusterStateSubscription]
where aClusterStateSubscription
is one ofGetCurrentState
orSubscribe
andUnsubscribe
to cluster events likeMemberRemoved
- state: The current
CurrentClusterState
Cluster Management
If not using configuration to specify seeds joining the cluster can be done programmatically via the manager
.
- Scala
-
source
cluster1.manager ! Join(cluster1.selfMember.address)
- Java
-
source
cluster.manager().tell(Join.create(cluster.selfMember().address()));
Leaving and downing are similar e.g.
- Scala
-
source
cluster2.manager ! Leave(cluster2.selfMember.address)
- Java
-
source
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
Cluster subscriptions
Cluster subscriptions
can be used to receive messages when cluster state changes. For example, registering for all MemberEvent
s, then using the manager
to have a node leave the cluster will result in events for the node going through the lifecycle described in Cluster Specification.
This example subscribes with a TestProbe
but in a real application it would be an Actor:
- Scala
-
source
val probe1 = TestProbe[MemberEvent]()(system1) cluster1.subscriptions ! Subscribe(probe1.ref, classOf[MemberEvent])
- Java
-
source
TestProbe<ClusterEvent.MemberEvent> testProbe = TestProbe.create(system); cluster .subscriptions() .tell(Subscribe.create(testProbe.ref(), ClusterEvent.MemberEvent.class));
Then asking a node to leave:
- Scala
-
source
cluster1.manager ! Leave(cluster2.selfMember.address) probe1.within(10.seconds) { probe1.expectMessageType[MemberLeft].member.address shouldEqual cluster2.selfMember.address probe1.expectMessageType[MemberExited].member.address shouldEqual cluster2.selfMember.address probe1.expectMessageType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address }
- Java
-
source
cluster.manager().tell(Leave.create(cluster2.selfMember().address())); testProbe.expectMessageClass(ClusterEvent.MemberLeft.class); testProbe.expectMessageClass(ClusterEvent.MemberExited.class); testProbe.expectMessageClass(ClusterEvent.MemberRemoved.class);
Serialization
See serialization for how messages are sent between ActorSystems. Actor references are typically included in the messages, since there is no sender
. To serialize actor references to/from string representation you will use the ActorRefResolver
. For example here’s how a serializer could look for the Ping
and Pong
messages above:
- Scala
-
source
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { private val actorRefResolver = ActorRefResolver(system.toTyped) private val PingManifest = "a" private val PongManifest = "b" override def identifier = 41 override def manifest(msg: AnyRef) = msg match { case _: Ping => PingManifest case Pong => PongManifest } override def toBinary(msg: AnyRef) = msg match { case Ping(who) => actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) case Pong => Array.emptyByteArray } override def fromBinary(bytes: Array[Byte], manifest: String) = { manifest match { case PingManifest => val str = new String(bytes, StandardCharsets.UTF_8) val ref = actorRefResolver.resolveActorRef[Pong.type](str) Ping(ref) case PongManifest => Pong } } }
- Java
-
source
public class PingSerializer extends SerializerWithStringManifest { final ExtendedActorSystem system; final ActorRefResolver actorRefResolver; static final String PING_MANIFEST = "a"; static final String PONG_MANIFEST = "b"; PingSerializer(ExtendedActorSystem system) { this.system = system; actorRefResolver = ActorRefResolver.get(Adapter.toTyped(system)); } @Override public int identifier() { return 97876; } @Override public String manifest(Object obj) { if (obj instanceof Ping) return PING_MANIFEST; else if (obj instanceof Pong) return PONG_MANIFEST; else throw new IllegalArgumentException("Unknown type: " + obj); } @Override public byte[] toBinary(Object obj) { if (obj instanceof Ping) return actorRefResolver .toSerializationFormat(((Ping) obj).replyTo) .getBytes(StandardCharsets.UTF_8); else if (obj instanceof Pong) return new byte[0]; else throw new IllegalArgumentException("Unknown type: " + obj); } @Override public Object fromBinary(byte[] bytes, String manifest) throws NotSerializableException { if (PING_MANIFEST.equals(manifest)) { String str = new String(bytes, StandardCharsets.UTF_8); ActorRef<Pong> ref = actorRefResolver.resolveActorRef(str); return new Ping(ref); } else if (PONG_MANIFEST.equals(manifest)) { return new Pong(); } else { throw new NotSerializableException("Unable to handle manifest: " + manifest); } } }