final class Replicator extends Actor with ActorLogging
A replicated in-memory data store supporting low latency and high availability requirements.
The Replicator
actor takes care of direct replication and gossip based
dissemination of Conflict Free Replicated Data Types (CRDTs) to replicas in the
the cluster.
The data types must be convergent CRDTs and implement ReplicatedData, i.e.
they provide a monotonic merge function and the state changes always converge.
You can use your own custom ReplicatedData or DeltaReplicatedData types, and several types are provided by this package, such as:
- Counters: GCounter, PNCounter
- Registers: LWWRegister, Flag
- Sets: GSet, ORSet
- Maps: ORMap, ORMultiMap, LWWMap, PNCounterMap
For good introduction to the CRDT subject watch the Eventually Consistent Data Structures talk by Sean Cribbs and and the talk by Mark Shapiro and read the excellent paper A comprehensive study of Convergent and Commutative Replicated Data Types by Mark Shapiro et. al.
The Replicator
actor must be started on each node in the cluster, or group of
nodes tagged with a specific role. It communicates with other Replicator
instances
with the same path (without address) that are running on other nodes . For convenience it
can be used with the DistributedData extension but it can also be started as an ordinary
actor using the Replicator.props
. If it is started as an ordinary actor it is important
that it is given the same name, started on same path, on all nodes.
Delta State Replicated Data Types are supported. delta-CRDT is a way to reduce the need for sending the full state for updates. For example adding element 'c' and 'd' to set {'a', 'b'} would result in sending the delta {'c', 'd'} and merge that with the state on the receiving side, resulting in set {'a', 'b', 'c', 'd'}.
The protocol for replicating the deltas supports causal consistency if the data type
is marked with RequiresCausalDeliveryOfDeltas. Otherwise it is only eventually
consistent. Without causal consistency it means that if elements 'c' and 'd' are
added in two separate Update
operations these deltas may occasionally be propagated
to nodes in different order than the causal order of the updates. For this example it
can result in that set {'a', 'b', 'd'} can be seen before element 'c' is seen. Eventually
it will be {'a', 'b', 'c', 'd'}.
Update
To modify and replicate a ReplicatedData value you send a Replicator.Update message
to the local Replicator
.
The current data value for the key
of the Update
is passed as parameter to the modify
function of the Update
. The function is supposed to return the new value of the data, which
will then be replicated according to the given consistency level.
The modify
function is called by the Replicator
actor and must therefore be a pure
function that only uses the data parameter and stable fields from enclosing scope. It must
for example not access sender()
reference of an enclosing actor.
Update
is intended to only be sent from an actor running in same local ActorSystem
as
the Replicator
, because the modify
function is typically not serializable.
You supply a write consistency level which has the following meaning:
WriteLocal
the value will immediately only be written to the local replica, and later disseminated with gossipWriteTo(n)
the value will immediately be written to at leastn
replicas, including the local replicaWriteMajority
the value will immediately be written to a majority of replicas, i.e. at leastN/2 + 1
replicas, where N is the number of nodes in the cluster (or cluster role group)WriteAll
the value will immediately be written to all nodes in the cluster (or all nodes in the cluster role group)
As reply of the Update
a Replicator.UpdateSuccess is sent to the sender of the
Update
if the value was successfully replicated according to the supplied consistency
level within the supplied timeout. Otherwise a Replicator.UpdateFailure subclass is
sent back. Note that a Replicator.UpdateTimeout reply does not mean that the update completely failed
or was rolled back. It may still have been replicated to some nodes, and will eventually
be replicated to all nodes with the gossip protocol.
You will always see your own writes. For example if you send two Update
messages
changing the value of the same key
, the modify
function of the second message will
see the change that was performed by the first Update
message.
In the Update
message you can pass an optional request context, which the Replicator
does not care about, but is included in the reply messages. This is a convenient
way to pass contextual information (e.g. original sender) without having to use ask
or local correlation data structures.
Get
To retrieve the current value of a data you send Replicator.Get message to the
Replicator
. You supply a consistency level which has the following meaning:
ReadLocal
the value will only be read from the local replicaReadFrom(n)
the value will be read and merged fromn
replicas, including the local replicaReadMajority
the value will be read and merged from a majority of replicas, i.e. at leastN/2 + 1
replicas, where N is the number of nodes in the cluster (or cluster role group)ReadAll
the value will be read and merged from all nodes in the cluster (or all nodes in the cluster role group)
As reply of the Get
a Replicator.GetSuccess is sent to the sender of the
Get
if the value was successfully retrieved according to the supplied consistency
level within the supplied timeout. Otherwise a Replicator.GetFailure is sent.
If the key does not exist the reply will be Replicator.NotFound.
You will always read your own writes. For example if you send a Update
message
followed by a Get
of the same key
the Get
will retrieve the change that was
performed by the preceding Update
message. However, the order of the reply messages are
not defined, i.e. in the previous example you may receive the GetSuccess
before
the UpdateSuccess
.
In the Get
message you can pass an optional request context in the same way as for the
Update
message, described above. For example the original sender can be passed and replied
to after receiving and transforming GetSuccess
.
Subscribe
You may also register interest in change notifications by sending Replicator.Subscribe
message to the Replicator
. It will send Replicator.Changed messages to the registered
subscriber when the data for the subscribed key is updated. Subscribers will be notified
periodically with the configured notify-subscribers-interval
, and it is also possible to
send an explicit Replicator.FlushChanges
message to the Replicator
to notify the subscribers
immediately.
The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with the Replicator.Unsubscribe message.
Delete
A data entry can be deleted by sending a Replicator.Delete message to the local
local Replicator
. As reply of the Delete
a Replicator.DeleteSuccess is sent to
the sender of the Delete
if the value was successfully deleted according to the supplied
consistency level within the supplied timeout. Otherwise a Replicator.ReplicationDeleteFailure
is sent. Note that ReplicationDeleteFailure
does not mean that the delete completely failed or
was rolled back. It may still have been replicated to some nodes, and may eventually be replicated
to all nodes.
A deleted key cannot be reused again, but it is still recommended to delete unused
data entries because that reduces the replication overhead when new nodes join the cluster.
Subsequent Delete
, Update
and Get
requests will be replied with Replicator.DataDeleted,
Replicator.UpdateDataDeleted and Replicator.GetDataDeleted respectively.
Subscribers will receive Replicator.Deleted.
In the Delete
message you can pass an optional request context in the same way as for the
Update
message, described above. For example the original sender can be passed and replied
to after receiving and transforming DeleteSuccess
.
CRDT Garbage
One thing that can be problematic with CRDTs is that some data types accumulate history (garbage).
For example a GCounter
keeps track of one counter per node. If a GCounter
has been updated
from one node it will associate the identifier of that node forever. That can become a problem
for long running systems with many cluster nodes being added and removed. To solve this problem
the Replicator
performs pruning of data associated with nodes that have been removed from the
cluster. Data types that need pruning have to implement RemovedNodePruning. The pruning consists
of several steps:
- When a node is removed from the cluster it is first important that all updates that were
done by that node are disseminated to all other nodes. The pruning will not start before the
maxPruningDissemination
duration has elapsed. The time measurement is stopped when any replica is unreachable, but it's still recommended to configure this with certain margin. It should be in the magnitude of minutes. - The nodes are ordered by their address and the node ordered first is called leader.
The leader initiates the pruning by adding a
PruningInitialized
marker in the data envelope. This is gossiped to all other nodes and they mark it as seen when they receive it. - When the leader sees that all other nodes have seen the
PruningInitialized
marker the leader performs the pruning and changes the marker toPruningPerformed
so that nobody else will redo the pruning. The data envelope with this pruning state is a CRDT itself. The pruning is typically performed by "moving" the part of the data associated with the removed node to the leader node. For example, aGCounter
is aMap
with the node as key and the counts done by that node as value. When pruning the value of the removed node is moved to the entry owned by the leader node. See RemovedNodePruning#prune. - Thereafter the data is always cleared from parts associated with the removed node so that it does not come back when merging. See RemovedNodePruning#pruningCleanup
- After another
maxPruningDissemination
duration after pruning the last entry from the removed node thePruningPerformed
markers in the data envelope are collapsed into a single tombstone entry, for efficiency. Clients may continue to use old data and therefore all data are always cleared from parts associated with tombstoned nodes.
- Source
- Replicator.scala
- Alphabetic
- By Inheritance
- Replicator
- ActorLogging
- Actor
- AnyRef
- Any
- by any2stringadd
- by StringFormat
- by Ensuring
- by ArrowAssoc
- Hide All
- Show All
- Public
- Protected
Instance Constructors
- new Replicator(settings: ReplicatorSettings)
Type Members
- type Receive = PartialFunction[Any, Unit]
- Definition Classes
- Actor
Value Members
- final def !=(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- final def ##: Int
- Definition Classes
- AnyRef → Any
- def +(other: String): String
- Implicit
- This member is added by an implicit conversion from Replicator toany2stringadd[Replicator] performed by method any2stringadd in scala.Predef.
- Definition Classes
- any2stringadd
- def ->[B](y: B): (Replicator, B)
- Implicit
- This member is added by an implicit conversion from Replicator toArrowAssoc[Replicator] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @inline()
- final def ==(arg0: Any): Boolean
- Definition Classes
- AnyRef → Any
- var allReachableClockTime: Long
- def aroundPostRestart(reason: Throwable): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
postRestart
. CallspostRestart
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
- def aroundPostStop(): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
postStop
. CallspostStop
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
- def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
preRestart
. CallspreRestart
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
- def aroundPreStart(): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to
preStart
. CallspreStart
by default.- Attributes
- protected[akka]
- Definition Classes
- Actor
- Annotations
- @InternalApi()
- def aroundReceive(rcv: actor.Actor.Receive, msg: Any): Unit
INTERNAL API.
INTERNAL API.
Can be overridden to intercept calls to this actor's current behavior.
- msg
current message.
- Attributes
- protected[akka]
- Definition Classes
- Replicator → Actor
- final def asInstanceOf[T0]: T0
- Definition Classes
- Any
- var changed: Set[KeyId]
- val clockTask: Cancellable
- def clone(): AnyRef
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.CloneNotSupportedException]) @HotSpotIntrinsicCandidate() @native()
- val cluster: Cluster
- def collectRemovedNodes(): Unit
- implicit val context: ActorContext
Scala API: Stores the context for this actor, including self, and sender.
Scala API: Stores the context for this actor, including self, and sender. It is implicit to support operations such as
forward
.WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!
akka.actor.ActorContext is the Scala API.
getContext
returns a akka.actor.AbstractActor.ActorContext, which is the Java API of the actor context.- Definition Classes
- Actor
- var dataEntries: Map[KeyId, (DataEnvelope, Digest, Timestamp)]
- def deleteObsoletePruningPerformed(): Unit
- val deltaPropagationSelector: DeltaPropagationSelector { val gossipIntervalDivisor: Int }
- val deltaPropagationTask: Option[Cancellable]
- def digest(envelope: DataEnvelope): (Digest, Int)
- returns
SHA-1 digest of the serialized data, and the size of the serialized data
- val durable: Set[KeyId]
- val durableStore: ActorRef
- val durableWildcards: Set[KeyId]
- def ensuring(cond: (Replicator) => Boolean, msg: => Any): Replicator
- Implicit
- This member is added by an implicit conversion from Replicator toEnsuring[Replicator] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- def ensuring(cond: (Replicator) => Boolean): Replicator
- Implicit
- This member is added by an implicit conversion from Replicator toEnsuring[Replicator] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- def ensuring(cond: Boolean, msg: => Any): Replicator
- Implicit
- This member is added by an implicit conversion from Replicator toEnsuring[Replicator] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- def ensuring(cond: Boolean): Replicator
- Implicit
- This member is added by an implicit conversion from Replicator toEnsuring[Replicator] performed by method Ensuring in scala.Predef.
- Definition Classes
- Ensuring
- final def eq(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- def equals(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef → Any
- var exitingNodes: SortedSet[UniqueAddress]
- val expiryEnabled: Boolean
- val expiryWildcards: Map[KeyId, FiniteDuration]
- var fullStateGossipEnabled: Boolean
- final def getClass(): Class[_ <: AnyRef]
- Definition Classes
- AnyRef → Any
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- def getData(key: KeyId): Option[DataEnvelope]
- def getDeltaSeqNr(key: KeyId, fromNode: UniqueAddress): Long
- def getDigest(key: KeyId): Digest
- def getExpiryDuration(key: KeyId): FiniteDuration
- def getUsedTimestamp(key: KeyId): Timestamp
- val gossipTask: Cancellable
- def gossipTo(address: UniqueAddress): Unit
- val hasDurableKeys: Boolean
- def hasSubscriber(subscriber: ActorRef): Boolean
- def hashCode(): Int
- Definition Classes
- AnyRef → Any
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- def initRemovedNodePruning(): Unit
- def isDurable(key: KeyId): Boolean
- def isExpired(key: KeyId, timestamp: Timestamp): Boolean
- def isExpired(key: KeyId): Boolean
- final def isInstanceOf[T0]: Boolean
- Definition Classes
- Any
- def isLeader: Boolean
- def isLocalGet(readConsistency: ReadConsistency): Boolean
- def isLocalSender(): Boolean
- def isLocalUpdate(writeConsistency: WriteConsistency): Boolean
- def isNodeRemoved(node: UniqueAddress, keys: Iterable[KeyId]): Boolean
- var joiningNodes: SortedSet[UniqueAddress]
- var leader: TreeSet[Member]
- val load: Receive
- def log: LoggingAdapter
- Definition Classes
- ActorLogging
- def matchingRole(m: Member): Boolean
- val maxPruningDisseminationNanos: Long
- var membersByAge: SortedSet[Member]
- final def ne(arg0: AnyRef): Boolean
- Definition Classes
- AnyRef
- val newSubscribers: HashMap[KeyId, Set[ActorRef]] with MultiMap[KeyId, ActorRef]
- var nodes: SortedSet[UniqueAddress]
- val normalReceive: Receive
- final def notify(): Unit
- Definition Classes
- AnyRef
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- final def notifyAll(): Unit
- Definition Classes
- AnyRef
- Annotations
- @HotSpotIntrinsicCandidate() @native()
- val notifyTask: Cancellable
- def performRemovedNodePruning(): Unit
- def postRestart(reason: Throwable): Unit
User overridable callback: By default it calls
preStart()
.User overridable callback: By default it calls
preStart()
.- reason
the Throwable that caused the restart to happen Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash.
- Definition Classes
- Actor
- Annotations
- @throws(classOf[Exception])
- def postStop(): Unit
User overridable callback.
User overridable callback.
Is called asynchronously after 'actor.stop()' is invoked. Empty default implementation.
- Definition Classes
- Replicator → Actor
- def preRestart(reason: Throwable, message: Option[Any]): Unit
Scala API: User overridable callback: By default it disposes of all children and then calls
postStop()
.Scala API: User overridable callback: By default it disposes of all children and then calls
postStop()
.- reason
the Throwable that caused the restart to happen
- message
optionally the current message the actor processed when failing, if applicable Is called on a crashed Actor right BEFORE it is restarted to allow clean up of resources before Actor is terminated.
- Definition Classes
- Actor
- Annotations
- @throws(classOf[Exception])
- def preStart(): Unit
User overridable callback.
User overridable callback.
Is called when an Actor is started. Actors are automatically started asynchronously when created. Empty default implementation.
- Definition Classes
- Replicator → Actor
- var previousClockTime: Long
- val pruningTask: Option[Cancellable]
- def receive: actor.Actor.Receive
Scala API: This defines the initial actor behavior, it must return a partial function with the actor logic.
Scala API: This defines the initial actor behavior, it must return a partial function with the actor logic.
- Definition Classes
- Replicator → Actor
- def receiveClockTick(): Unit
- def receiveDelete(key: KeyR, consistency: WriteConsistency, req: Option[Any]): Unit
- def receiveDeltaPropagation(fromNode: UniqueAddress, reply: Boolean, deltas: Map[KeyId, Delta]): Unit
- def receiveDeltaPropagationTick(): Unit
- def receiveFlushChanges(): Unit
- Annotations
- @nowarn()
- def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): Unit
- def receiveGetKeyIds(): Unit
- def receiveGetReplicaCount(): Unit
- def receiveGossip(updatedData: Map[KeyId, (DataEnvelope, Timestamp)], sendBack: Boolean, fromSystemUid: Option[Long]): Unit
- def receiveGossipTick(): Unit
- def receiveMemberExiting(m: Member): Unit
- def receiveMemberJoining(m: Member): Unit
- def receiveMemberRemoved(m: Member): Unit
- def receiveMemberUp(m: Member): Unit
- def receiveMemberWeaklyUp(m: Member): Unit
- def receiveOtherMemberEvent(m: Member): Unit
- def receiveReachable(m: Member): Unit
- def receiveRead(key: KeyId): Unit
- def receiveReadRepair(key: KeyId, writeEnvelope: DataEnvelope): Unit
- def receiveRemovedNodePruningTick(): Unit
- def receiveStatus(otherDigests: Map[KeyId, (Digest, Timestamp)], chunk: Int, totChunks: Int, fromSystemUid: Option[Long]): Unit
- def receiveSubscribe(key: KeyR, subscriber: ActorRef): Unit
- def receiveTerminated(ref: ActorRef): Unit
- Annotations
- @nowarn()
- def receiveUnreachable(m: Member): Unit
- def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit
- def receiveUpdate[A <: ReplicatedData](key: KeyR, modify: (Option[A]) => A, writeConsistency: WriteConsistency, req: Option[Any]): Unit
- def receiveWrite(key: KeyId, envelope: DataEnvelope): Unit
- var removedNodes: Map[UniqueAddress, Long]
- def replica(node: UniqueAddress): ActorSelection
- var replyTo: ActorRef
- def selectRandomNode(addresses: IndexedSeq[UniqueAddress]): Option[UniqueAddress]
- implicit final val self: ActorRef
The 'self' field holds the ActorRef for this actor.
The 'self' field holds the ActorRef for this actor.
Can be used to send messages to itself:
self ! message
- Definition Classes
- Actor
- val selfAddress: Address
- val selfFromSystemUid: Some[Long]
- val selfUniqueAddress: UniqueAddress
- final def sender(): ActorRef
The reference sender Actor of the last received message.
The reference sender Actor of the last received message. Is defined if the message was sent from another Actor, else
deadLetters
in akka.actor.ActorSystem.WARNING: Only valid within the Actor itself, so do not close over it and publish it to other threads!
- Definition Classes
- Actor
- val serializer: Serializer
- def setData(key: KeyId, envelope: DataEnvelope): DataEnvelope
- var statusCount: Long
- var statusTotChunks: Int
- val subscribers: HashMap[KeyId, Set[ActorRef]] with MultiMap[KeyId, ActorRef]
- var subscriptionKeys: Map[KeyId, KeyR]
- val supervisorStrategy: OneForOneStrategy
User overridable definition the strategy to use for supervising child actors.
User overridable definition the strategy to use for supervising child actors.
- Definition Classes
- Replicator → Actor
- final def synchronized[T0](arg0: => T0): T0
- Definition Classes
- AnyRef
- def toString(): String
- Definition Classes
- AnyRef → Any
- def unhandled(message: Any): Unit
User overridable callback.
User overridable callback.
Is called when a message isn't handled by the current behavior of the actor by default it fails with either a akka.actor.DeathPactException (in case of an unhandled akka.actor.Terminated message) or publishes an akka.actor.UnhandledMessage to the actor's system's akka.event.EventStream
- Definition Classes
- Actor
- var unreachable: Set[UniqueAddress]
- def updateUsedTimestamp(key: KeyId, timestamp: Timestamp): Unit
- final def wait(arg0: Long, arg1: Int): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- final def wait(arg0: Long): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException]) @native()
- final def wait(): Unit
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.InterruptedException])
- var weaklyUpNodes: SortedSet[UniqueAddress]
- val wildcardSubscribers: HashMap[KeyId, Set[ActorRef]] with MultiMap[KeyId, ActorRef]
- def write(key: KeyId, writeEnvelope: DataEnvelope): Option[DataEnvelope]
- def writeAndStore(key: KeyId, writeEnvelope: DataEnvelope, reply: Boolean): Unit
Deprecated Value Members
- def finalize(): Unit
- Attributes
- protected[lang]
- Definition Classes
- AnyRef
- Annotations
- @throws(classOf[java.lang.Throwable]) @Deprecated
- Deprecated
(Since version 9)
- def formatted(fmtstr: String): String
- Implicit
- This member is added by an implicit conversion from Replicator toStringFormat[Replicator] performed by method StringFormat in scala.Predef.
- Definition Classes
- StringFormat
- Annotations
- @deprecated @inline()
- Deprecated
(Since version 2.12.16) Use
formatString.format(value)
instead ofvalue.formatted(formatString)
, or use thef""
string interpolator. In Java 15 and later,formatted
resolves to the new method in String which has reversed parameters.
- def →[B](y: B): (Replicator, B)
- Implicit
- This member is added by an implicit conversion from Replicator toArrowAssoc[Replicator] performed by method ArrowAssoc in scala.Predef.
- Definition Classes
- ArrowAssoc
- Annotations
- @deprecated
- Deprecated
(Since version 2.13.0) Use
->
instead. If you still wish to display it as one character, consider using a font with programming ligatures such as Fira Code.