final class Replicator extends Actor with ActorLogging

A replicated in-memory data store supporting low latency and high availability requirements.

The Replicator actor takes care of direct replication and gossip based dissemination of Conflict Free Replicated Data Types (CRDTs) to replicas in the the cluster. The data types must be convergent CRDTs and implement ReplicatedData, i.e. they provide a monotonic merge function and the state changes always converge.

You can use your own custom ReplicatedData types, and several types are provided by this package, such as:

For good introduction to the CRDT subject watch the The Final Causal Frontier and Eventually Consistent Data Structures talk by Sean Cribbs and and the talk by Mark Shapiro and read the excellent paper A comprehensive study of Convergent and Commutative Replicated Data Types by Mark Shapiro et. al.

The Replicator actor must be started on each node in the cluster, or group of nodes tagged with a specific role. It communicates with other Replicator instances with the same path (without address) that are running on other nodes . For convenience it can be used with the DistributedData extension.

Update

To modify and replicate a ReplicatedData value you send a Replicator.Update message to the local Replicator. The current data value for the key of the Update is passed as parameter to the modify function of the Update. The function is supposed to return the new value of the data, which will then be replicated according to the given consistency level.

The modify function is called by the Replicator actor and must therefore be a pure function that only uses the data parameter and stable fields from enclosing scope. It must for example not access sender() reference of an enclosing actor.

Update is intended to only be sent from an actor running in same local ActorSystem as the Replicator, because the modify function is typically not serializable.

You supply a write consistency level which has the following meaning:

  • WriteLocal the value will immediately only be written to the local replica, and later disseminated with gossip
  • WriteTo(n) the value will immediately be written to at least n replicas, including the local replica
  • WriteMajority the value will immediately be written to a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
  • WriteAll the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)

As reply of the Update a Replicator.UpdateSuccess is sent to the sender of the Update if the value was successfully replicated according to the supplied consistency level within the supplied timeout. Otherwise a Replicator.UpdateFailure subclass is sent back. Note that a Replicator.UpdateTimeout reply does not mean that the update completely failed or was rolled back. It may still have been replicated to some nodes, and will eventually be replicated to all nodes with the gossip protocol.

You will always see your own writes. For example if you send two Update messages changing the value of the same key, the modify function of the second message will see the change that was performed by the first Update message.

In the Update message you can pass an optional request context, which the Replicator does not care about, but is included in the reply messages. This is a convenient way to pass contextual information (e.g. original sender) without having to use ask or local correlation data structures.

Get

To retrieve the current value of a data you send Replicator.Get message to the Replicator. You supply a consistency level which has the following meaning:

  • ReadLocal the value will only be read from the local replica
  • ReadFrom(n) the value will be read and merged from n replicas, including the local replica
  • ReadMajority the value will be read and merged from a majority of replicas, i.e. at least N/2 + 1 replicas, where N is the number of nodes in the cluster (or cluster role group)
  • ReadAll the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group)

As reply of the Get a Replicator.GetSuccess is sent to the sender of the Get if the value was successfully retrieved according to the supplied consistency level within the supplied timeout. Otherwise a Replicator.GetFailure is sent. If the key does not exist the reply will be Replicator.NotFound.

You will always read your own writes. For example if you send a Update message followed by a Get of the same key the Get will retrieve the change that was performed by the preceding Update message. However, the order of the reply messages are not defined, i.e. in the previous example you may receive the GetSuccess before the UpdateSuccess.

In the Get message you can pass an optional request context in the same way as for the Update message, described above. For example the original sender can be passed and replied to after receiving and transforming GetSuccess.

Subscribe

You may also register interest in change notifications by sending Replicator.Subscribe message to the Replicator. It will send Replicator.Changed messages to the registered subscriber when the data for the subscribed key is updated. Subscribers will be notified periodically with the configured notify-subscribers-interval, and it is also possible to send an explicit Replicator.FlushChanges message to the Replicator to notify the subscribers immediately.

The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with the Replicator.Unsubscribe message.

Delete

A data entry can be deleted by sending a Replicator.Delete message to the local local Replicator. As reply of the Delete a Replicator.DeleteSuccess is sent to the sender of the Delete if the value was successfully deleted according to the supplied consistency level within the supplied timeout. Otherwise a Replicator.ReplicationDeleteFailure is sent. Note that ReplicationDeleteFailure does not mean that the delete completely failed or was rolled back. It may still have been replicated to some nodes, and may eventually be replicated to all nodes.

A deleted key cannot be reused again, but it is still recommended to delete unused data entries because that reduces the replication overhead when new nodes join the cluster. Subsequent Delete, Update and Get requests will be replied with Replicator.DataDeleted. Subscribers will receive Replicator.DataDeleted.

CRDT Garbage

One thing that can be problematic with CRDTs is that some data types accumulate history (garbage). For example a GCounter keeps track of one counter per node. If a GCounter has been updated from one node it will associate the identifier of that node forever. That can become a problem for long running systems with many cluster nodes being added and removed. To solve this problem the Replicator performs pruning of data associated with nodes that have been removed from the cluster. Data types that need pruning have to implement RemovedNodePruning. The pruning consists of several steps:

  • When a node is removed from the cluster it is first important that all updates that were done by that node are disseminated to all other nodes. The pruning will not start before the maxPruningDissemination duration has elapsed. The time measurement is stopped when any replica is unreachable, so it should be configured to worst case in a healthy cluster.
  • The nodes are ordered by their address and the node ordered first is called leader. The leader initiates the pruning by adding a PruningInitialized marker in the data envelope. This is gossiped to all other nodes and they mark it as seen when they receive it.
  • When the leader sees that all other nodes have seen the PruningInitialized marker the leader performs the pruning and changes the marker to PruningPerformed so that nobody else will redo the pruning. The data envelope with this pruning state is a CRDT itself. The pruning is typically performed by "moving" the part of the data associated with the removed node to the leader node. For example, a GCounter is a Map with the node as key and the counts done by that node as value. When pruning the value of the removed node is moved to the entry owned by the leader node. See RemovedNodePruning#prune.
  • Thereafter the data is always cleared from parts associated with the removed node so that it does not come back when merging. See RemovedNodePruning#pruningCleanup
  • After another maxPruningDissemination duration after pruning the last entry from the removed node the PruningPerformed markers in the data envelope are collapsed into a single tombstone entry, for efficiency. Clients may continue to use old data and therefore all data are always cleared from parts associated with tombstoned nodes.
Source
Replicator.scala
Linear Supertypes
Type Hierarchy
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Replicator
  2. ActorLogging
  3. Actor
  4. AnyRef
  5. Any
Implicitly
  1. by any2stringadd
  2. by StringFormat
  3. by Ensuring
  4. by ArrowAssoc
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new Replicator(settings: ReplicatorSettings)

Type Members

  1. type Receive = PartialFunction[Any, Unit]
    Definition Classes
    Actor

Value Members

  1. final def !=(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int
    Definition Classes
    AnyRef → Any
  3. def +(other: String): String
    Implicit
    This member is added by an implicit conversion from Replicator to any2stringadd[Replicator] performed by method any2stringadd in scala.Predef.
    Definition Classes
    any2stringadd
  4. def ->[B](y: B): (Replicator, B)
    Implicit
    This member is added by an implicit conversion from Replicator to ArrowAssoc[Replicator] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc
    Annotations
    @inline()
  5. final def ==(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  6. var allReachableClockTime: Long
  7. def aroundPostRestart(reason: Throwable): Unit

    Can be overridden to intercept calls to postRestart.

    Can be overridden to intercept calls to postRestart. Calls postRestart by default.

    Attributes
    protected[akka]
    Definition Classes
    Actor
  8. def aroundPostStop(): Unit

    Can be overridden to intercept calls to postStop.

    Can be overridden to intercept calls to postStop. Calls postStop by default.

    Attributes
    protected[akka]
    Definition Classes
    Actor
  9. def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit

    Can be overridden to intercept calls to preRestart.

    Can be overridden to intercept calls to preRestart. Calls preRestart by default.

    Attributes
    protected[akka]
    Definition Classes
    Actor
  10. def aroundPreStart(): Unit

    Can be overridden to intercept calls to preStart.

    Can be overridden to intercept calls to preStart. Calls preStart by default.

    Attributes
    protected[akka]
    Definition Classes
    Actor
  11. def aroundReceive(receive: actor.Actor.Receive, msg: Any): Unit

    INTERNAL API.

    INTERNAL API.

    Can be overridden to intercept calls to this actor's current behavior.

    receive

    current behavior.

    msg

    current message.

    Attributes
    protected[akka]
    Definition Classes
    Actor
  12. final def asInstanceOf[T0]: T0
    Definition Classes
    Any
  13. var changed: Set[String]
  14. val clockTask: Cancellable
  15. def clone(): AnyRef
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  16. val cluster: Cluster
  17. implicit val context: ActorContext

    Stores the context for this actor, including self, and sender.

    Stores the context for this actor, including self, and sender. It is implicit to support operations such as forward.

    WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!

    akka.actor.ActorContext is the Scala API. getContext returns a akka.actor.UntypedActorContext, which is the Java API of the actor context.

    Definition Classes
    Actor
  18. var dataEntries: Map[String, (DataEnvelope, Digest)]
  19. def digest(envelope: DataEnvelope): Digest
  20. val durable: Set[String]
  21. val durableStore: ActorRef
  22. val durableWildcards: Set[String]
  23. def ensuring(cond: (Replicator) ⇒ Boolean, msg: ⇒ Any): Replicator
    Implicit
    This member is added by an implicit conversion from Replicator to Ensuring[Replicator] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  24. def ensuring(cond: (Replicator) ⇒ Boolean): Replicator
    Implicit
    This member is added by an implicit conversion from Replicator to Ensuring[Replicator] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  25. def ensuring(cond: Boolean, msg: ⇒ Any): Replicator
    Implicit
    This member is added by an implicit conversion from Replicator to Ensuring[Replicator] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  26. def ensuring(cond: Boolean): Replicator
    Implicit
    This member is added by an implicit conversion from Replicator to Ensuring[Replicator] performed by method Ensuring in scala.Predef.
    Definition Classes
    Ensuring
  27. final def eq(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  28. def equals(arg0: Any): Boolean
    Definition Classes
    AnyRef → Any
  29. def finalize(): Unit
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  30. def formatted(fmtstr: String): String
    Implicit
    This member is added by an implicit conversion from Replicator to StringFormat[Replicator] performed by method StringFormat in scala.Predef.
    Definition Classes
    StringFormat
    Annotations
    @inline()
  31. final def getClass(): Class[_]
    Definition Classes
    AnyRef → Any
  32. def getData(key: String): Option[DataEnvelope]
  33. def getDigest(key: String): Digest
  34. val gossipTask: Cancellable
  35. def gossipTo(address: Address): Unit
  36. val hasDurableKeys: Boolean
  37. def hasSubscriber(subscriber: ActorRef): Boolean
  38. def hashCode(): Int
    Definition Classes
    AnyRef → Any
  39. def initRemovedNodePruning(): Unit
  40. def isDurable(key: String): Boolean
  41. final def isInstanceOf[T0]: Boolean
    Definition Classes
    Any
  42. def isLeader: Boolean
  43. def isLocalGet(readConsistency: ReadConsistency): Boolean
  44. def isLocalSender(): Boolean
  45. def isLocalUpdate(writeConsistency: WriteConsistency): Boolean
  46. var leader: Option[Address]
  47. val load: Receive
  48. def log: LoggingAdapter
    Definition Classes
    ActorLogging
  49. def matchingRole(m: Member): Boolean
  50. val maxPruningDisseminationNanos: Long
  51. final def ne(arg0: AnyRef): Boolean
    Definition Classes
    AnyRef
  52. val newSubscribers: HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
  53. var nodes: Set[Address]
  54. val normalReceive: Receive
  55. final def notify(): Unit
    Definition Classes
    AnyRef
  56. final def notifyAll(): Unit
    Definition Classes
    AnyRef
  57. val notifyTask: Cancellable
  58. def performRemovedNodePruning(): Unit
  59. def postRestart(reason: Throwable): Unit

    User overridable callback: By default it calls preStart().

    User overridable callback: By default it calls preStart().

    reason

    the Throwable that caused the restart to happen Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.

    Definition Classes
    Actor
    Annotations
    @throws( classOf[Exception] )
  60. def postStop(): Unit

    User overridable callback.

    User overridable callback.

    Is called asynchronously after 'actor.stop()' is invoked. Empty default implementation.

    Definition Classes
    ReplicatorActor
  61. def preRestart(reason: Throwable, message: Option[Any]): Unit

    User overridable callback: By default it disposes of all children and then calls postStop().

    User overridable callback: By default it disposes of all children and then calls postStop().

    reason

    the Throwable that caused the restart to happen

    message

    optionally the current message the actor processed when failing, if applicable Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.

    Definition Classes
    Actor
    Annotations
    @throws( classOf[Exception] )
  62. def preStart(): Unit

    User overridable callback.

    User overridable callback.

    Is called when an Actor is started. Actors are automatically started asynchronously when created. Empty default implementation.

    Definition Classes
    ReplicatorActor
  63. var previousClockTime: Long
  64. def pruningCleanupTombstoned(removed: UniqueAddress, data: ReplicatedData): ReplicatedData
  65. def pruningCleanupTombstoned(data: ReplicatedData): ReplicatedData
  66. def pruningCleanupTombstoned(removed: UniqueAddress, envelope: DataEnvelope): DataEnvelope
  67. def pruningCleanupTombstoned(envelope: DataEnvelope): DataEnvelope
  68. var pruningPerformed: Map[UniqueAddress, Long]
  69. val pruningTask: Cancellable
  70. def receive: actor.Actor.Receive

    This defines the initial actor behavior, it must return a partial function with the actor logic.

    This defines the initial actor behavior, it must return a partial function with the actor logic.

    Definition Classes
    ReplicatorActor
  71. def receiveClockTick(): Unit
  72. def receiveDelete(key: KeyR, consistency: WriteConsistency): Unit
  73. def receiveFlushChanges(): Unit
  74. def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit
  75. def receiveGetKeyIds(): Unit
  76. def receiveGetReplicaCount(): Unit
  77. def receiveGossip(updatedData: Map[String, DataEnvelope], sendBack: Boolean): Unit
  78. def receiveGossipTick(): Unit
  79. def receiveLeaderChanged(leaderOption: Option[Address], roleOption: Option[String]): Unit
  80. def receiveMemberRemoved(m: Member): Unit
  81. def receiveMemberUp(m: Member): Unit
  82. def receiveReachable(m: Member): Unit
  83. def receiveRead(key: String): Unit
  84. def receiveReadRepair(key: String, writeEnvelope: DataEnvelope): Unit
  85. def receiveRemovedNodePruningTick(): Unit
  86. def receiveStatus(otherDigests: Map[String, Digest], chunk: Int, totChunks: Int): Unit
  87. def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit
  88. def receiveTerminated(ref: ActorRef): Unit
  89. def receiveUnreachable(m: Member): Unit
  90. def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit
  91. def receiveUpdate(key: KeyR, modify: (Option[ReplicatedData]) ⇒ ReplicatedData, writeConsistency: WriteConsistency, req: Option[Any]): Unit
  92. def receiveWeaklyUpMemberUp(m: Member): Unit
  93. def receiveWrite(key: String, envelope: DataEnvelope): Unit
  94. var removedNodes: Map[UniqueAddress, Long]
  95. def replica(address: Address): ActorSelection
  96. def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address]
  97. implicit final val self: ActorRef

    The 'self' field holds the ActorRef for this actor.

    The 'self' field holds the ActorRef for this actor.

    Can be used to send messages to itself:

    self ! message
    

    Definition Classes
    Actor
  98. val selfAddress: Address
  99. val selfUniqueAddress: UniqueAddress
  100. final def sender(): ActorRef

    The reference sender Actor of the last received message.

    The reference sender Actor of the last received message. Is defined if the message was sent from another Actor, else deadLetters in akka.actor.ActorSystem.

    WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!

    Definition Classes
    Actor
  101. val serializer: Serializer
  102. def setData(key: String, envelope: DataEnvelope): Unit
  103. var statusCount: Long
  104. var statusTotChunks: Int
  105. val subscribers: HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
  106. var subscriptionKeys: Map[String, KeyR]
  107. val supervisorStrategy: OneForOneStrategy
    Definition Classes
    ReplicatorActor
  108. final def synchronized[T0](arg0: ⇒ T0): T0
    Definition Classes
    AnyRef
  109. def toString(): String
    Definition Classes
    AnyRef → Any
  110. var tombstoneNodes: Set[UniqueAddress]
  111. def tombstoneRemovedNodePruning(): Unit
  112. def unhandled(message: Any): Unit

    User overridable callback.

    User overridable callback.

    Is called when a message isn't handled by the current behavior of the actor by default it fails with either a akka.actor.DeathPactException (in case of an unhandled akka.actor.Terminated message) or publishes an akka.actor.UnhandledMessage to the actor's system's akka.event.EventStream

    Definition Classes
    Actor
  113. var unreachable: Set[Address]
  114. final def wait(): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  115. final def wait(arg0: Long, arg1: Int): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  116. final def wait(arg0: Long): Unit
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  117. var weaklyUpNodes: Set[Address]
  118. def write(key: String, writeEnvelope: DataEnvelope): Option[DataEnvelope]
  119. def [B](y: B): (Replicator, B)
    Implicit
    This member is added by an implicit conversion from Replicator to ArrowAssoc[Replicator] performed by method ArrowAssoc in scala.Predef.
    Definition Classes
    ArrowAssoc

Inherited from ActorLogging

Inherited from Actor

Inherited from AnyRef

Inherited from Any

Inherited by implicit conversion any2stringadd from Replicator to any2stringadd[Replicator]

Inherited by implicit conversion StringFormat from Replicator to StringFormat[Replicator]

Inherited by implicit conversion Ensuring from Replicator to Ensuring[Replicator]

Inherited by implicit conversion ArrowAssoc from Replicator to ArrowAssoc[Replicator]

Ungrouped