Cluster
Dependency
To use Akka Cluster Typed, you must add the following dependency in your project:
Introduction
For an introduction to Akka Cluster concepts see Cluster Specification. This documentation shows how to use the typed Cluster API.
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Examples
All of the examples below assume the following imports:
- Scala
-
source
import akka.actor.typed._ import akka.actor.typed.scaladsl._ import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus import akka.cluster.typed._
- Java
And the minimum configuration required is to set a host/port for remoting and the akka.actor.provider = "cluster"
.
sourceakka {
actor {
provider = "cluster"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
}
}
Cluster API extension
The typed Cluster extension gives access to management tasks (Joining, Leaving, Downing, …) and subscription of cluster membership events (MemberUp, MemberRemoved, UnreachableMember, etc). Those are exposed as two different actor references, i.e. it’s a message based API.
The references are on the Cluster
extension:
The Cluster extensions gives you access to:
- manager: An
ActorRef[ClusterCommand]
where aClusterCommand
is a command such as:Join
,Leave
andDown
- subscriptions: An
ActorRef[ClusterStateSubscription]
where aClusterStateSubscription
is one ofGetCurrentState
orSubscribe
andUnsubscribe
to cluster events likeMemberRemoved
- state: The current
CurrentClusterState
Cluster Management
If not using configuration to specify seeds joining the cluster can be done programmatically via the manager
.
Leaving and downing are similar e.g.
Cluster subscriptions
Cluster subscriptions
can be used to receive messages when cluster state changes. For example, registering for all MemberEvent
s, then using the manager
to have a node leave the cluster will result in events for the node going through the lifecycle described in Cluster Specification.
This example subscribes with a TestProbe
but in a real application it would be an Actor:
- Scala
-
source
val probe1 = TestProbe[MemberEvent]()(system1) cluster1.subscriptions ! Subscribe(probe1.ref, classOf[MemberEvent])
- Java
Then asking a node to leave:
- Scala
-
source
cluster1.manager ! Leave(cluster2.selfMember.address) probe1.within(10.seconds) { probe1.expectMessageType[MemberLeft].member.address shouldEqual cluster2.selfMember.address probe1.expectMessageType[MemberExited].member.address shouldEqual cluster2.selfMember.address probe1.expectMessageType[MemberRemoved].member.address shouldEqual cluster2.selfMember.address }
- Java
Serialization
See serialization for how messages are sent between ActorSystems. Actor references are typically included in the messages, since there is no sender
. To serialize actor references to/from string representation you will use the ActorRefResolver
. For example here’s how a serializer could look for the Ping
and Pong
messages above:
- Scala
-
source
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { private val actorRefResolver = ActorRefResolver(system.toTyped) private val PingManifest = "a" private val PongManifest = "b" override def identifier = 41 override def manifest(msg: AnyRef) = msg match { case _: Ping => PingManifest case Pong => PongManifest } override def toBinary(msg: AnyRef) = msg match { case Ping(who) => actorRefResolver.toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) case Pong => Array.emptyByteArray } override def fromBinary(bytes: Array[Byte], manifest: String) = { manifest match { case PingManifest => val str = new String(bytes, StandardCharsets.UTF_8) val ref = actorRefResolver.resolveActorRef[Pong.type](str) Ping(ref) case PongManifest => Pong } } }
- Java