akka.contrib

pattern

package pattern

Visibility
  1. Public
  2. All

Type Members

  1. trait Aggregator extends AnyRef

    The aggregator is to be mixed into an actor for the aggregator behavior.

  2. class ClusterClient extends Actor with Stash with ActorLogging

    This actor is intended to be used on an external node that is not member of the cluster.

    This actor is intended to be used on an external node that is not member of the cluster. It acts like a gateway for sending messages to actors somewhere in the cluster. From the initial contact points it will establish a connection to a ClusterReceptionist somewhere in the cluster. It will monitor the connection to the receptionist and establish a new connection if the link goes down. When looking for a new receptionist it uses fresh contact points retrieved from previous establishment, or periodically refreshed contacts, i.e. not necessarily the initial contact points.

    You can send messages via the ClusterClient to any actor in the cluster that is registered in the ClusterReceptionist. Messages are wrapped in ClusterClient.Send, ClusterClient.SendToAll or ClusterClient.Publish.

    1. ClusterClient.Send - The message will be delivered to one recipient with a matching path, if any such exists. If several entries match the path the message will be delivered to one random destination. The sender of the message can specify that local affinity is preferred, i.e. the message is sent to an actor in the same local actor system as the used receptionist actor, if any such exists, otherwise random to any other matching entry.

    2. ClusterClient.SendToAll - The message will be delivered to all recipients with a matching path.

    3. ClusterClient.Publish - The message will be delivered to all recipients Actors that have been registered as subscribers to to the named topic.

    Use the factory method ClusterClient#props) to create the akka.actor.Props for the actor.

  3. class ClusterReceptionist extends Actor with ActorLogging

    ClusterClient connects to this actor to retrieve.

    ClusterClient connects to this actor to retrieve. The ClusterReceptionist is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The receptionist can be started with the ClusterReceptionistExtension or as an ordinary actor (use the factory method ClusterReceptionist#props).

    The receptionist forwards messages from the client to the associated DistributedPubSubMediator, i.e. the client can send messages to any actor in the cluster that is registered in the DistributedPubSubMediator. Messages from the client are wrapped in DistributedPubSubMediator.Send, DistributedPubSubMediator.SendToAll or DistributedPubSubMediator.Publish with the semantics described in DistributedPubSubMediator.

    Response messages from the destination actor are tunneled via the receptionist to avoid inbound connections from other cluster nodes to the client, i.e. the sender, as seen by the destination actor, is not the client itself. The sender of the response messages, as seen by the client, is preserved as the original sender, so the client can choose to send subsequent messages directly to the actor in the cluster.

  4. class ClusterReceptionistExtension extends Extension

  5. class ClusterSharding extends Extension

  6. class ClusterSingletonManager extends Actor with FSM[State, Data]

    Manages singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role.

    Manages singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role. At most one singleton instance is running at any point in time.

    The ClusterSingletonManager is supposed to be started on all nodes, or all nodes with specified role, in the cluster with actorOf. The actual singleton is started on the oldest node by creating a child actor from the supplied singletonProps.

    The singleton actor is always running on the oldest member, which can be determined by akka.cluster.Member#isOlderThan. This can change when removing members. A graceful hand over can normally be performed when current oldest node is leaving the cluster. Be aware that there is a short time period when there is no active singleton during the hand-over process.

    The cluster failure detector will notice when oldest node becomes unreachable due to things like JVM crash, hard shut down, or network failure. When the crashed node has been removed (via down) from the cluster then a new oldest node will take over and a new singleton actor is created. For these failure scenarios there will not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts.

    You access the singleton actor with actorSelection using the names you have specified when creating the ClusterSingletonManager. You can subscribe to akka.cluster.ClusterEvent.MemberEvent and sort the members by age (akka.cluster.ClusterEvent.Member#isOlderThan) to keep track of oldest member. Alternatively the singleton actor may broadcast its existence when it is started.

    Use factory method to create the akka.actor.Props for the actor.

    Arguments

    singletonProps akka.actor.Props of the singleton actor instance.

    singletonName The actor name of the child singleton actor.

    terminationMessage When handing over to a new oldest node this terminationMessage is sent to the singleton actor to tell it to finish its work, close resources, and stop. The hand-over to the new oldest node is completed when the singleton actor is terminated. Note that akka.actor.PoisonPill is a perfectly fine terminationMessage if you only need to stop the actor.

    role Singleton among the nodes tagged with specified role. If the role is not specified it's a singleton among all nodes in the cluster.

    maxHandOverRetries When a node is becoming oldest it sends hand-over request to previous oldest. This is retried with the retryInterval until the previous oldest confirms that the hand over has started, or this maxHandOverRetries limit has been reached. If the retry limit is reached it takes the decision to be the new oldest if previous oldest is unknown (typically removed), otherwise it initiates a new round by throwing akka.contrib.pattern.ClusterSingletonManagerIsStuck and expecting restart with fresh state. For a cluster with many members you might need to increase this retry limit because it takes longer time to propagate changes across all nodes.

    maxTakeOverRetries When a oldest node is not oldest any more it sends take over request to the new oldest to initiate the normal hand-over process. This is especially useful when new node joins and becomes oldest immediately, without knowing who was previous oldest. This is retried with the retryInterval until this retry limit has been reached. If the retry limit is reached it initiates a new round by throwing akka.contrib.pattern.ClusterSingletonManagerIsStuck and expecting restart with fresh state. This will also cause the singleton actor to be stopped. maxTakeOverRetries must be less than maxHandOverRetries to ensure that new oldest doesn't start singleton actor before previous is stopped for certain corner cases.

  7. class ClusterSingletonManagerIsStuck extends AkkaException

    Thrown when a consistent state can't be determined within the defined retry limits.

    Thrown when a consistent state can't be determined within the defined retry limits. Eventually it will reach a stable state and can continue, and that is simplified by starting over with a clean state. Parent supervisor should typically restart the actor, i.e. default decision.

  8. class ClusterSingletonProxy extends Actor with Stash with ActorLogging

    The ClusterSingletonProxy works together with the akka.contrib.pattern.ClusterSingletonManager to provide a distributed proxy to the singleton actor.

    The ClusterSingletonProxy works together with the akka.contrib.pattern.ClusterSingletonManager to provide a distributed proxy to the singleton actor.

    The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash them when the singleton is finally available. The proxy mixes in the akka.actor.Stash trait, so it can be configured accordingly.

    The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by periodically sending an akka.actor.Identify message until the singleton responds with its akka.actor.ActorIdentity.

    Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the actors involved.

  9. class DistributedPubSubExtension extends Extension

  10. class DistributedPubSubMediator extends Actor with ActorLogging

    This actor manages a registry of actor references and replicates the entries to peer actors among all cluster nodes or a group of nodes tagged with a specific role.

    This actor manages a registry of actor references and replicates the entries to peer actors among all cluster nodes or a group of nodes tagged with a specific role.

    The DistributedPubSubMediator is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The mediator can be started with the DistributedPubSubExtension or as an ordinary actor.

    Changes are only performed in the own part of the registry and those changes are versioned. Deltas are disseminated in a scalable way to other nodes with a gossip protocol. The registry is eventually consistent, i.e. changes are not immediately visible at other nodes, but typically they will be fully replicated to all other nodes after a few seconds.

    You can send messages via the mediator on any node to registered actors on any other node. There is three modes of message delivery.

    1. DistributedPubSubMediator.Send - The message will be delivered to one recipient with a matching path, if any such exists in the registry. If several entries match the path the message will be sent via the supplied routingLogic (default random) to one destination. The sender of the message can specify that local affinity is preferred, i.e. the message is sent to an actor in the same local actor system as the used mediator actor, if any such exists, otherwise route to any other matching entry. A typical usage of this mode is private chat to one other user in an instant messaging application. It can also be used for distributing tasks to registered workers, like a cluster aware router where the routees dynamically can register themselves.

    2. DistributedPubSubMediator.SendToAll - The message will be delivered to all recipients with a matching path. Actors with the same path, without address information, can be registered on different nodes. On each node there can only be one such actor, since the path is unique within one local actor system. Typical usage of this mode is to broadcast messages to all replicas with the same path, e.g. 3 actors on different nodes that all perform the same actions, for redundancy.

    3. DistributedPubSubMediator.Publish - Actors may be registered to a named topic instead of path. This enables many subscribers on each node. The message will be delivered to all subscribers of the topic. For efficiency the message is sent over the wire only once per node (that has a matching topic), and then delivered to all subscribers of the local topic representation. This is the true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging application.

    4. DistributedPubSubMediator.Publish with sendOneMessageToEachGroup - Actors may be subscribed to a named topic with an optional property group. If subscribing with a group name, each message published to a topic with the sendOneMessageToEachGroup flag is delivered via the supplied routingLogic (default random) to one actor within each subscribing group. If all the subscribed actors have the same group name, then this works just like DistributedPubSubMediator.Send and all messages are delivered to one subscribe. If all the subscribed actors have different group names, then this works like normal DistributedPubSubMediator.Publish and all messages are broadcast to all subscribers.

    You register actors to the local mediator with DistributedPubSubMediator.Put or DistributedPubSubMediator.Subscribe. Put is used together with Send and SendToAll message delivery modes. The ActorRef in Put must belong to the same local actor system as the mediator. Subscribe is used together with Publish. Actors are automatically removed from the registry when they are terminated, or you can explicitly remove entries with DistributedPubSubMediator.Remove or DistributedPubSubMediator.Unsubscribe.

    Successful Subscribe and Unsubscribe is acknowledged with DistributedPubSubMediator.SubscribeAck and DistributedPubSubMediator.UnsubscribeAck replies.

  11. trait DistributedPubSubMessage extends Serializable

    Marker trait for remote messages with special serializer.

  12. class ReliableProxy extends Actor with LoggingFSM[State, Vector[Message]] with ReliableProxyDebugLogging

    A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:

    A ReliableProxy is a means to wrap a remote actor reference in order to obtain certain improved delivery guarantees:

    • as long as the proxy is not terminated before it sends all of its queued messages then no messages will be lost
    • messages re-sent due to the first point will not be delivered out-of-order, message ordering is preserved

    These guarantees are valid for the communication between the two end-points of the reliable “tunnel”, which usually spans an unreliable network.

    Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery.

    Delivery from the remote end-point to the target actor is still subject to in-JVM delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory situations or other VM errors).

    You can create a reliable connection like this:

    In Scala:

    val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds)

    or in Java:

    final ActorRef proxy = getContext().actorOf(ReliableProxy.props(
      target, Duration.create(100, "millis"), Duration.create(120, "seconds")));

    Please note: the tunnel is uni-directional, and original sender information is retained, hence replies by the wrapped target reference will go back in the normal “unreliable” way unless also secured by a ReliableProxy from the remote end.

    Message Types

    This actor is an akka.actor.FSM, hence it offers the service of transition callbacks to those actors which subscribe using the SubscribeTransitionCallBack and UnsubscribeTransitionCallBack messages; see akka.actor.FSM for more documentation. The proxy will transition into ReliableProxy.Active state when ACKs are outstanding and return to the ReliableProxy.Idle state when every message send so far has been confirmed by the peer end-point.

    The initial state of the proxy is ReliableProxy.Connecting. In this state the proxy will repeatedly send akka.actor.Identify messages to ActorSelection(targetPath) in order to obtain a new ActorRef for the target. When an akka.actor.ActorIdentity for the target is received a new tunnel will be created, a ReliableProxy.TargetChanged message containing the target ActorRef will be sent to the proxy's transition subscribers and the proxy will transition into either the ReliableProxy.Idle or ReliableProxy.Active state, depending if there are any outstanding messages that need to be delivered. If maxConnectAttempts is defined this actor will stop itself after Identify is sent maxConnectAttempts times.

    While in the Idle or Active states, if a communication failure causes the tunnel to terminate via Remote Deathwatch the proxy will transition into the ReliableProxy.Connecting state as described above. After reconnecting TargetChanged will be sent only if the target ActorRef has changed.

    If this actor is stopped and it still has outstanding messages a ReliableProxy.ProxyTerminated message will be sent to the transition subscribers. It contains an Unsent object with the outstanding messages.

    If an ReliableProxy.Unsent message is sent to this actor the messages contained within it will be relayed through the tunnel to the target.

    Any other message type sent to this actor will be delivered via a remote-deployed child actor to the designated target.

    Failure Cases

    All failures of either the local or the remote end-point are escalated to the parent of this actor; there are no specific error cases which are predefined.

    Arguments

    See the constructor below for the arguments for this actor. However, prefer using akka.contrib.pattern.ReliableProxy#props to this actor's constructor.

  13. class ShardCoordinator extends PersistentActor with ActorLogging

    Singleton coordinator that decides where to allocate shards.

    Singleton coordinator that decides where to allocate shards.

    See also

    ClusterSharding extension

  14. class ShardCoordinatorSupervisor extends Actor

  15. class ShardRegion extends Actor with ActorLogging

    This actor creates children entry actors on demand for the shards that it is told to be responsible for.

    This actor creates children entry actors on demand for the shards that it is told to be responsible for. It delegates messages targeted to other shards to the responsible ShardRegion actor on other nodes.

    See also

    ClusterSharding extension

  16. class WorkList[T] extends AnyRef

    Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed.

    Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed. The list is not thread safe! However it is expected to be reentrant. This means a processing function can add/remove entries from the list while processing. Most important, a processing function can remove its own entry from the list. The first remove must return true and any subsequent removes must return false.

    T

    The type of the item

Value Members

  1. object ClusterClient

  2. object ClusterReceptionist

  3. object ClusterReceptionistExtension extends ExtensionId[ClusterReceptionistExtension] with ExtensionIdProvider

    Extension that starts ClusterReceptionist and accompanying DistributedPubSubMediator with settings defined in config section akka.contrib.cluster.receptionist.

    Extension that starts ClusterReceptionist and accompanying DistributedPubSubMediator with settings defined in config section akka.contrib.cluster.receptionist. The DistributedPubSubMediator is started by the DistributedPubSubExtension.

  4. object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProvider

    This extension provides sharding functionality of actors in a cluster.

    This extension provides sharding functionality of actors in a cluster. The typical use case is when you have many stateful actors that together consume more resources (e.g. memory) than fit on one machine. You need to distribute them across several nodes in the cluster and you want to be able to interact with them using their logical identifier, but without having to care about their physical location in the cluster, which might also change over time. It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology. Here we call these actors "entries". These actors typically have persistent (durable) state, but this feature is not limited to actors with persistent state.

    In this context sharding means that actors with an identifier, so called entries, can be automatically distributed across multiple nodes in the cluster. Each entry actor runs only at one place, and messages can be sent to the entry without requiring the sender to know the location of the destination actor. This is achieved by sending the messages via a ShardRegion actor provided by this extension, which knows how to route the message with the entry id to the final destination.

    This extension is supposed to be used by first, typically at system startup on each node in the cluster, registering the supported entry types with the ClusterSharding#start method and then the ShardRegion actor for a named entry type can be retrieved with ClusterSharding#shardRegion. Messages to the entries are always sent via the local ShardRegion. Some settings can be configured as described in the akka.contrib.cluster.sharding section of the reference.conf.

    The ShardRegion actor is started on each node in the cluster, or group of nodes tagged with a specific role. The ShardRegion is created with two application specific functions to extract the entry identifier and the shard identifier from incoming messages. A shard is a group of entries that will be managed together. For the first message in a specific shard the ShardRegion request the location of the shard from a central coordinator, the ShardCoordinator. The ShardCoordinator decides which ShardRegion that owns the shard. The ShardRegion receives the decided home of the shard and if that is the ShardRegion instance itself it will create a local child actor representing the entry and direct all messages for that entry to it. If the shard home is another ShardRegion instance messages will be forwarded to that ShardRegion instance instead. While resolving the location of a shard incoming messages for that shard are buffered and later delivered when the shard home is known. Subsequent messages to the resolved shard can be delivered to the target destination immediately without involving the ShardCoordinator.

    To make sure that at most one instance of a specific entry actor is running somewhere in the cluster it is important that all nodes have the same view of where the shards are located. Therefore the shard allocation decisions are taken by the central ShardCoordinator, which is running as a cluster singleton, i.e. one instance on the oldest member among all cluster nodes or a group of nodes tagged with a specific role. The oldest member can be determined by akka.cluster.Member#isOlderThan.

    The logic that decides where a shard is to be located is defined in a pluggable shard allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy allocates new shards to the ShardRegion with least number of previously allocated shards. This strategy can be replaced by an application specific implementation.

    To be able to use newly added members in the cluster the coordinator facilitates rebalancing of shards, i.e. migrate entries from one node to another. In the rebalance process the coordinator first notifies all ShardRegion actors that a handoff for a shard has started. That means they will start buffering incoming messages for that shard, in the same way as if the shard location is unknown. During the rebalance process the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. The ShardRegion responsible for the rebalanced shard will stop all entries in that shard by sending PoisonPill to them. When all entries have been terminated the ShardRegion owning the entries will acknowledge the handoff as completed to the coordinator. Thereafter the coordinator will reply to requests for the location of the shard and thereby allocate a new home for the shard and then buffered messages in the ShardRegion actors are delivered to the new location. This means that the state of the entries are not transferred or migrated. If the state of the entries are of importance it should be persistent (durable), e.g. with akka-persistence, so that it can be recovered at the new location.

    The logic that decides which shards to rebalance is defined in a pluggable shard allocation strategy. The default implementation ShardCoordinator.LeastShardAllocationStrategy picks shards for handoff from the ShardRegion with most number of previously allocated shards. They will then be allocated to the ShardRegion with least number of previously allocated shards, i.e. new members in the cluster. There is a configurable threshold of how large the difference must be to begin the rebalancing. This strategy can be replaced by an application specific implementation.

    The state of shard locations in the ShardCoordinator is persistent (durable) with akka-persistence to survive failures. Since it is running in a cluster akka-persistence must be configured with a distributed journal. When a crashed or unreachable coordinator node has been removed (via down) from the cluster a new ShardCoordinator singleton actor will take over and the state is recovered. During such a failure period shards with known location are still available, while messages for new (unknown) shards are buffered until the new ShardCoordinator becomes available.

    As long as a sender uses the same ShardRegion actor to deliver messages to an entry actor the order of the messages is preserved. As long as the buffer limit is not reached messages are delivered on a best effort basis, with at-most once delivery semantics, in the same way as ordinary message sending. Reliable end-to-end messaging, with at-least-once semantics can be added by using channels in akka-persistence.

    Some additional latency is introduced for messages targeted to new or previously unused shards due to the round-trip to the coordinator. Rebalancing of shards may also add latency. This should be considered when designing the application specific shard resolution, e.g. to avoid too fine grained shards.

    The ShardRegion actor can also be started in proxy only mode, i.e. it will not host any entries itself, but knows how to delegate messages to the right location. A ShardRegion starts in proxy only mode if the roles of the node does not include the node role specified in akka.contrib.cluster.sharding.role config property or if the specified entryProps is None/null.

    If the state of the entries are persistent you may stop entries that are not used to reduce memory consumption. This is done by the application specific implementation of the entry actors for example by defining receive timeout (context.setReceiveTimeout). If a message is already enqueued to the entry when it stops itself the enqueued message in the mailbox will be dropped. To support graceful passivation without loosing such messages the entry actor can send ShardRegion.Passivate to its parent ShardRegion. The specified wrapped message in Passivate will be sent back to the entry, which is then supposed to stop itself. Incoming messages will be buffered by the ShardRegion between reception of Passivate and termination of the entry. Such buffered messages are thereafter delivered to a new incarnation of the entry.

  5. object ClusterSingletonManager

  6. object ClusterSingletonProxy

  7. object DistributedPubSubExtension extends ExtensionId[DistributedPubSubExtension] with ExtensionIdProvider

    Extension that starts a DistributedPubSubMediator actor with settings defined in config section akka.contrib.cluster.pub-sub.

  8. object DistributedPubSubMediator

  9. object ReliableProxy

  10. object ShardCoordinator

  11. object ShardCoordinatorSupervisor

  12. object ShardRegion

  13. object WorkList

    Provides the utility methods and constructors to the WorkList class.

  14. package protobuf

Ungrouped