The aggregator is to be mixed into an actor for the aggregator behavior.
This actor is intended to be used on an external node that is not member of the cluster.
ClusterClient connects to this actor to retrieve.
ClusterClient connects to this actor to retrieve. The ClusterReceptionist
is
supposed to be started on all nodes, or all nodes with specified role, in the cluster.
The receptionist can be started with the ClusterReceptionistExtension or as an
ordinary actor (use the factory method ClusterReceptionist#props).
The receptionist forwards messages from the client to the associated DistributedPubSubMediator,
i.e. the client can send messages to any actor in the cluster that is registered in the
DistributedPubSubMediator
. Messages from the client are wrapped in
DistributedPubSubMediator.Send, DistributedPubSubMediator.SendToAll
or DistributedPubSubMediator.Publish with the semantics described in
DistributedPubSubMediator.
Response messages from the destination actor are tunneled via the receptionist
to avoid inbound connections from other cluster nodes to the client, i.e.
the sender
, as seen by the destination actor, is not the client itself.
The sender
of the response messages, as seen by the client, is preserved
as the original sender, so the client can choose to send subsequent messages
directly to the actor in the cluster.
Manages singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role.
Manages singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role. At most one singleton instance is running at any point in time.
The ClusterSingletonManager is supposed to be started on all nodes,
or all nodes with specified role, in the cluster with actorOf
.
The actual singleton is started on the oldest node by creating a child
actor from the supplied singletonProps
.
The singleton actor is always running on the oldest member, which can be determined by akka.cluster.Member#isOlderThan. This can change when removing members. A graceful hand over can normally be performed when current oldest node is leaving the cluster. Be aware that there is a short time period when there is no active singleton during the hand-over process.
The cluster failure detector will notice when oldest node becomes unreachable due to things like JVM crash, hard shut down, or network failure. When the crashed node has been removed (via down) from the cluster then a new oldest node will take over and a new singleton actor is created. For these failure scenarios there will not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts.
You access the singleton actor with actorSelection
using the names you have
specified when creating the ClusterSingletonManager. You can subscribe to
akka.cluster.ClusterEvent.MemberEvent and sort the members by age
(akka.cluster.ClusterEvent.Member#isOlderThan) to keep track of oldest member.
Alternatively the singleton actor may broadcast its existence when it is started.
Use factory method to create the akka.actor.Props for the actor.
singletonProps akka.actor.Props of the singleton actor instance.
singletonName The actor name of the child singleton actor.
terminationMessage When handing over to a new oldest node
this terminationMessage
is sent to the singleton actor to tell
it to finish its work, close resources, and stop.
The hand-over to the new oldest node is completed when the
singleton actor is terminated.
Note that akka.actor.PoisonPill is a perfectly fine
terminationMessage
if you only need to stop the actor.
role Singleton among the nodes tagged with specified role. If the role is not specified it's a singleton among all nodes in the cluster.
maxHandOverRetries When a node is becoming oldest it sends
hand-over request to previous oldest. This is retried with the
retryInterval
until the previous oldest confirms that the hand
over has started, or this maxHandOverRetries
limit has been
reached. If the retry limit is reached it takes the decision to be
the new oldest if previous oldest is unknown (typically removed),
otherwise it initiates a new round by throwing
akka.contrib.pattern.ClusterSingletonManagerIsStuck and expecting
restart with fresh state. For a cluster with many members you might
need to increase this retry limit because it takes longer time to
propagate changes across all nodes.
maxTakeOverRetries When a oldest node is not oldest any more
it sends take over request to the new oldest to initiate the normal
hand-over process. This is especially useful when new node joins and becomes
oldest immediately, without knowing who was previous oldest. This is retried
with the retryInterval
until this retry limit has been reached. If the retry
limit is reached it initiates a new round by throwing
akka.contrib.pattern.ClusterSingletonManagerIsStuck and expecting
restart with fresh state. This will also cause the singleton actor to be
stopped. maxTakeOverRetries
must be less than maxHandOverRetries
to
ensure that new oldest doesn't start singleton actor before previous is
stopped for certain corner cases.
Thrown when a consistent state can't be determined within the defined retry limits.
Thrown when a consistent state can't be determined within the defined retry limits. Eventually it will reach a stable state and can continue, and that is simplified by starting over with a clean state. Parent supervisor should typically restart the actor, i.e. default decision.
The ClusterSingletonProxy
works together with the akka.contrib.pattern.ClusterSingletonManager to provide a
distributed proxy to the singleton actor.
The ClusterSingletonProxy
works together with the akka.contrib.pattern.ClusterSingletonManager to provide a
distributed proxy to the singleton actor.
The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash them when the singleton is finally available. The proxy mixes in the akka.actor.Stash trait, so it can be configured accordingly.
The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by periodically sending an akka.actor.Identify message until the singleton responds with its akka.actor.ActorIdentity.
Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the actors involved.
This actor manages a registry of actor references and replicates the entries to peer actors among all cluster nodes or a group of nodes tagged with a specific role.
This actor manages a registry of actor references and replicates the entries to peer actors among all cluster nodes or a group of nodes tagged with a specific role.
The DistributedPubSubMediator
is supposed to be started on all nodes,
or all nodes with specified role, in the cluster. The mediator can be
started with the DistributedPubSubExtension or as an ordinary actor.
Changes are only performed in the own part of the registry and those changes are versioned. Deltas are disseminated in a scalable way to other nodes with a gossip protocol. The registry is eventually consistent, i.e. changes are not immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds.
You can send messages via the mediator on any node to registered actors on any other node. There is three modes of message delivery.
1. DistributedPubSubMediator.Send -
The message will be delivered to one recipient with a matching path, if any such
exists in the registry. If several entries match the path the message will be sent
via the supplied routingLogic
(default random) to one destination. The sender of the
message can specify that local affinity is preferred, i.e. the message is sent to an actor
in the same local actor system as the used mediator actor, if any such exists, otherwise
route to any other matching entry. A typical usage of this mode is private chat to one
other user in an instant messaging application. It can also be used for distributing
tasks to registered workers, like a cluster aware router where the routees dynamically
can register themselves.
2. DistributedPubSubMediator.SendToAll - The message will be delivered to all recipients with a matching path. Actors with the same path, without address information, can be registered on different nodes. On each node there can only be one such actor, since the path is unique within one local actor system. Typical usage of this mode is to broadcast messages to all replicas with the same path, e.g. 3 actors on different nodes that all perform the same actions, for redundancy.
3. DistributedPubSubMediator.Publish - Actors may be registered to a named topic instead of path. This enables many subscribers on each node. The message will be delivered to all subscribers of the topic. For efficiency the message is sent over the wire only once per node (that has a matching topic), and then delivered to all subscribers of the local topic representation. This is the true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging application.
4. DistributedPubSubMediator.Publish with sendOneMessageToEachGroup -
Actors may be subscribed to a named topic with an optional property group
.
If subscribing with a group name, each message published to a topic with the
sendOneMessageToEachGroup
flag is delivered via the supplied routingLogic
(default random) to one actor within each subscribing group.
If all the subscribed actors have the same group name, then this works just like
DistributedPubSubMediator.Send and all messages are delivered to one subscribe.
If all the subscribed actors have different group names, then this works like normal
DistributedPubSubMediator.Publish and all messages are broadcast to all subscribers.
You register actors to the local mediator with DistributedPubSubMediator.Put or
DistributedPubSubMediator.Subscribe. Put
is used together with Send
and
SendToAll
message delivery modes. The ActorRef
in Put
must belong to the same
local actor system as the mediator. Subscribe
is used together with Publish
.
Actors are automatically removed from the registry when they are terminated, or you
can explicitly remove entries with DistributedPubSubMediator.Remove or
DistributedPubSubMediator.Unsubscribe.
Successful Subscribe
and Unsubscribe
is acknowledged with
DistributedPubSubMediator.SubscribeAck and DistributedPubSubMediator.UnsubscribeAck
replies.
Marker trait for remote messages with special serializer.
A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:
A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:
These guarantees are valid for the communication between the two end-points of the reliable “tunnel”, which usually spans an unreliable network.
Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery.
Delivery from the remote end-point to the target actor is still subject to in-JVM delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory situations or other VM errors).
You can create a reliable connection like this:
In Scala:
val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds)
or in Java:
final ActorRef proxy = getContext().actorOf(ReliableProxy.props( target, Duration.create(100, "millis"), Duration.create(120, "seconds")));
Please note: the tunnel is uni-directional, and original sender information is retained, hence replies by the wrapped target reference will go back in the normal “unreliable” way unless also secured by a ReliableProxy from the remote end.
This actor is an akka.actor.FSM, hence it offers the service of
transition callbacks to those actors which subscribe using the
and SubscribeTransitionCallBack
messages; see akka.actor.FSM for more documentation. The proxy will
transition into ReliableProxy.Active state when ACKs
are outstanding and return to the ReliableProxy.Idle
state when every message send so far has been confirmed by the peer end-point.UnsubscribeTransitionCallBack
The initial state of the proxy is ReliableProxy.Connecting. In this state the
proxy will repeatedly send akka.actor.Identify messages to ActorSelection(targetPath)
in order to obtain a new ActorRef
for the target. When an akka.actor.ActorIdentity
for the target is received a new tunnel will be created, a ReliableProxy.TargetChanged
message containing the target ActorRef
will be sent to the proxy's transition subscribers
and the proxy will transition into either the ReliableProxy.Idle or ReliableProxy.Active
state, depending if there are any outstanding messages that need to be delivered. If
maxConnectAttempts
is defined this actor will stop itself after Identify
is sent
maxConnectAttempts
times.
While in the Idle
or Active
states, if a communication failure causes the tunnel to
terminate via Remote Deathwatch the proxy will transition into the ReliableProxy.Connecting
state as described above. After reconnecting TargetChanged
will be sent only if the target
ActorRef
has changed.
If this actor is stopped and it still has outstanding messages a
ReliableProxy.ProxyTerminated message will be sent to the
transition subscribers. It contains an Unsent
object with the outstanding messages.
If an ReliableProxy.Unsent message is sent to this actor the messages contained within it will be relayed through the tunnel to the target.
Any other message type sent to this actor will be delivered via a remote-deployed child actor to the designated target.
All failures of either the local or the remote end-point are escalated to the parent of this actor; there are no specific error cases which are predefined.
See the constructor below for the arguments for this actor. However, prefer using akka.contrib.pattern.ReliableProxy#props to this actor's constructor.
Singleton coordinator that decides where to allocate shards.
Singleton coordinator that decides where to allocate shards.
This actor creates children entry actors on demand for the shards that it is told to be responsible for.
This actor creates children entry actors on demand for the shards that it is told to be
responsible for. It delegates messages targeted to other shards to the responsible
ShardRegion
actor on other nodes.
Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed.
Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed. The list is not thread safe! However it is expected to be reentrant. This means a processing function can add/remove entries from the list while processing. Most important, a processing function can remove its own entry from the list. The first remove must return true and any subsequent removes must return false.
The type of the item
Extension that starts ClusterReceptionist and accompanying DistributedPubSubMediator
with settings defined in config section akka.contrib.cluster.receptionist
.
Extension that starts ClusterReceptionist and accompanying DistributedPubSubMediator
with settings defined in config section akka.contrib.cluster.receptionist
.
The DistributedPubSubMediator is started by the DistributedPubSubExtension.
This extension provides sharding functionality of actors in a cluster.
This extension provides sharding functionality of actors in a cluster. The typical use case is when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. You need to distribute them across several nodes in the cluster and you want to be able to interact with them using their logical identifier, but without having to care about their physical location in the cluster, which might also change over time. It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. Here we call these actors "entries". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state.
In this context sharding means that actors with an identifier, so called entries, can be automatically distributed across multiple nodes in the cluster. Each entry actor runs only at one place, and messages can be sent to the entry without requiring the sender to know the location of the destination actor. This is achieved by sending the messages via a ShardRegion actor provided by this extension, which knows how to route the message with the entry id to the final destination.
This extension is supposed to be used by first, typically at system startup on each node
in the cluster, registering the supported entry types with the ClusterSharding#start
method and then the ShardRegion
actor for a named entry type can be retrieved with
ClusterSharding#shardRegion. Messages to the entries are always sent via the local
ShardRegion
. Some settings can be configured as described in the akka.contrib.cluster.sharding
section of the reference.conf
.
The ShardRegion
actor is started on each node in the cluster, or group of nodes
tagged with a specific role. The ShardRegion
is created with two application specific
functions to extract the entry identifier and the shard identifier from incoming messages.
A shard is a group of entries that will be managed together. For the first message in a
specific shard the ShardRegion
request the location of the shard from a central coordinator,
the ShardCoordinator. The ShardCoordinator
decides which ShardRegion
that
owns the shard. The ShardRegion
receives the decided home of the shard
and if that is the ShardRegion
instance itself it will create a local child
actor representing the entry and direct all messages for that entry to it.
If the shard home is another ShardRegion
instance messages will be forwarded
to that ShardRegion
instance instead. While resolving the location of a
shard incoming messages for that shard are buffered and later delivered when the
shard home is known. Subsequent messages to the resolved shard can be delivered
to the target destination immediately without involving the ShardCoordinator
.
To make sure that at most one instance of a specific entry actor is running somewhere
in the cluster it is important that all nodes have the same view of where the shards
are located. Therefore the shard allocation decisions are taken by the central
ShardCoordinator
, which is running as a cluster singleton, i.e. one instance on
the oldest member among all cluster nodes or a group of nodes tagged with a specific
role. The oldest member can be determined by akka.cluster.Member#isOlderThan.
The logic that decides where a shard is to be located is defined in a pluggable shard
allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy
allocates new shards to the ShardRegion
with least number of previously allocated shards.
This strategy can be replaced by an application specific implementation.
To be able to use newly added members in the cluster the coordinator facilitates rebalancing
of shards, i.e. migrate entries from one node to another. In the rebalance process the
coordinator first notifies all ShardRegion
actors that a handoff for a shard has started.
That means they will start buffering incoming messages for that shard, in the same way as if the
shard location is unknown. During the rebalance process the coordinator will not answer any
requests for the location of shards that are being rebalanced, i.e. local buffering will
continue until the handoff is completed. The ShardRegion
responsible for the rebalanced shard
will stop all entries in that shard by sending PoisonPill
to them. When all entries have
been terminated the ShardRegion
owning the entries will acknowledge the handoff as completed
to the coordinator. Thereafter the coordinator will reply to requests for the location of
the shard and thereby allocate a new home for the shard and then buffered messages in the
ShardRegion
actors are delivered to the new location. This means that the state of the entries
are not transferred or migrated. If the state of the entries are of importance it should be
persistent (durable), e.g. with akka-persistence
, so that it can be recovered at the new
location.
The logic that decides which shards to rebalance is defined in a pluggable shard
allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy
picks shards for handoff from the ShardRegion
with most number of previously allocated shards.
They will then be allocated to the ShardRegion
with least number of previously allocated shards,
i.e. new members in the cluster. There is a configurable threshold of how large the difference
must be to begin the rebalancing. This strategy can be replaced by an application specific
implementation.
The state of shard locations in the ShardCoordinator
is persistent (durable) with
akka-persistence
to survive failures. Since it is running in a cluster akka-persistence
must be configured with a distributed journal. When a crashed or unreachable coordinator
node has been removed (via down) from the cluster a new ShardCoordinator
singleton
actor will take over and the state is recovered. During such a failure period shards
with known location are still available, while messages for new (unknown) shards
are buffered until the new ShardCoordinator
becomes available.
As long as a sender uses the same ShardRegion
actor to deliver messages to an entry
actor the order of the messages is preserved. As long as the buffer limit is not reached
messages are delivered on a best effort basis, with at-most once delivery semantics,
in the same way as ordinary message sending. Reliable end-to-end messaging, with
at-least-once semantics can be added by using channels in akka-persistence
.
Some additional latency is introduced for messages targeted to new or previously unused shards due to the round-trip to the coordinator. Rebalancing of shards may also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards.
The ShardRegion
actor can also be started in proxy only mode, i.e. it will not
host any entries itself, but knows how to delegate messages to the right location.
A ShardRegion
starts in proxy only mode if the roles of the node does not include
the node role specified in akka.contrib.cluster.sharding.role
config property
or if the specified entryProps
is None
/null
.
If the state of the entries are persistent you may stop entries that are not used to
reduce memory consumption. This is done by the application specific implementation of
the entry actors for example by defining receive timeout (context.setReceiveTimeout
).
If a message is already enqueued to the entry when it stops itself the enqueued message
in the mailbox will be dropped. To support graceful passivation without loosing such
messages the entry actor can send ShardRegion.Passivate to its parent ShardRegion
.
The specified wrapped message in Passivate
will be sent back to the entry, which is
then supposed to stop itself. Incoming messages will be buffered by the ShardRegion
between reception of Passivate
and termination of the entry. Such buffered messages
are thereafter delivered to a new incarnation of the entry.
Extension that starts a DistributedPubSubMediator actor
with settings defined in config section akka.contrib.cluster.pub-sub
.
Provides the utility methods and constructors to the WorkList class.
This actor is intended to be used on an external node that is not member of the cluster. It acts like a gateway for sending messages to actors somewhere in the cluster. From the initial contact points it will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.
You can send messages via the
ClusterClient
to any actor in the cluster that is registered in the ClusterReceptionist. Messages are wrapped in ClusterClient.Send, ClusterClient.SendToAll or ClusterClient.Publish.1. ClusterClient.Send - The message will be delivered to one recipient with a matching path, if any such exists. If several entries match the path the message will be delivered to one random destination. The sender of the message can specify that local affinity is preferred, i.e. the message is sent to an actor in the same local actor system as the used receptionist actor, if any such exists, otherwise random to any other matching entry.
2. ClusterClient.SendToAll - The message will be delivered to all recipients with a matching path.
3. ClusterClient.Publish - The message will be delivered to all recipients Actors that have been registered as subscribers to to the named topic.
Use the factory method ClusterClient#props) to create the akka.actor.Props for the actor.