Classic Distributed Data
Akka Classic pertains to the original Actor APIs, which have been improved by more type safe and guided Actor APIs. Akka Classic is still fully supported and existing applications can continue to use the classic APIs. It is also possible to use the new Actor APIs together with classic actors in the same ActorSystem, see coexistence. For new projects we recommend using the new Actor API.
For the full documentation of this feature and for new projects see Distributed Data.
Dependency
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
To use Akka Distributed Data, you must add the following dependency in your project:
- sbt
val AkkaVersion = "2.10.0" libraryDependencies += "com.typesafe.akka" %% "akka-distributed-data" % AkkaVersion
- Maven
- Gradle
Introduction
For the full documentation of this feature and for new projects see Distributed Data - Introduction.
Using the Replicator
The Replicator
actor provides the API for interacting with the data. The Replicator
actor must be started on each node in the cluster, or group of nodes tagged with a specific role. It communicates with other Replicator
instances with the same path (without address) that are running on other nodes . For convenience it can be used with the DistributedData
extension but it can also be started as an ordinary actor using the Replicator.props
. If it is started as an ordinary actor it is important that it is given the same name, started on same path, on all nodes.
Cluster members with status WeaklyUp, will participate in Distributed Data. This means that the data will be replicated to the WeaklyUp
nodes with the background gossip protocol. Note that it will not participate in any actions where the consistency mode is to read/write from all nodes or the majority of nodes. The WeaklyUp
node is not counted as part of the cluster. So 3 nodes + 5 WeaklyUp
is essentially a 3 node cluster as far as consistent actions are concerned.
Below is an example of an actor that schedules tick messages to itself and for each tick adds or removes elements from a ORSet
(observed-remove set). It also subscribes to changes of this.
- Scala
-
source
import java.util.concurrent.ThreadLocalRandom import akka.actor.Actor import akka.actor.ActorLogging import akka.cluster.ddata.DistributedData import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSetKey import akka.cluster.ddata.Replicator._ object DataBot { private case object Tick } class DataBot extends Actor with ActorLogging { import DataBot._ val replicator = DistributedData(context.system).replicator implicit val node: SelfUniqueAddress = DistributedData(context.system).selfUniqueAddress import context.dispatcher val tickTask = context.system.scheduler.scheduleWithFixedDelay(5.seconds, 5.seconds, self, Tick) val DataKey = ORSetKey[String]("key") replicator ! Subscribe(DataKey, self) def receive = { case Tick => val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString if (ThreadLocalRandom.current().nextBoolean()) { // add log.info("Adding: {}", s) replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_ :+ s) } else { // remove log.info("Removing: {}", s) replicator ! Update(DataKey, ORSet.empty[String], WriteLocal)(_.remove(s)) } case _: UpdateResponse[_] => // ignore case c @ Changed(DataKey) => val data = c.get(DataKey) log.info("Current elements: {}", data.elements) } override def postStop(): Unit = tickTask.cancel() }
- Java
Update
For the full documentation of this feature and for new projects see Distributed Data - Update.
To modify and replicate a data value you send a Replicator.Update
message to the local Replicator
.
The current data value for the key
of the Update
is passed as parameter to the modify
function of the Update
. The function is supposed to return the new value of the data, which will then be replicated according to the given consistency level.
The modify
function is called by the Replicator
actor and must therefore be a pure function that only uses the data parameter and stable fields from enclosing scope. It must for example not access the sender (sender()
) reference of an enclosing actor.
Update
is intended to only be sent from an actor running in same local ActorSystem
as the Replicator
, because the modify
function is typically not serializable.
- Scala
-
source
implicit val node: SelfUniqueAddress = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val Counter1Key = PNCounterKey("counter1") val Set1Key = GSetKey[String]("set1") val Set2Key = ORSetKey[String]("set2") val ActiveFlagKey = FlagKey("active") replicator ! Update(Counter1Key, PNCounter(), WriteLocal)(_ :+ 1) val writeTo3 = WriteTo(n = 3, timeout = 1.second) replicator ! Update(Set1Key, GSet.empty[String], writeTo3)(_ + "hello") val writeMajority = WriteMajority(timeout = 5.seconds) replicator ! Update(Set2Key, ORSet.empty[String], writeMajority)(_ :+ "hello") val writeAll = WriteAll(timeout = 5.seconds) replicator ! Update(ActiveFlagKey, Flag.Disabled, writeAll)(_.switchOn)
- Java
As reply of the Update
a Replicator.UpdateSuccess
is sent to the sender of the Update
if the value was successfully replicated according to the supplied write consistency level within the supplied timeout. Otherwise a Replicator.UpdateFailure
subclass is sent back. Note that a Replicator.UpdateTimeout
reply does not mean that the update completely failed or was rolled back. It may still have been replicated to some nodes, and will eventually be replicated to all nodes with the gossip protocol.
- Scala
-
source
case UpdateSuccess(Set1Key, req) => // ok case UpdateTimeout(Set1Key, req) => // write to 3 nodes failed within 1.second
- Java
You will always see your own writes. For example if you send two Update
messages changing the value of the same key
, the modify
function of the second message will see the change that was performed by the first Update
message.
It is possible to abort the Update
when inspecting the state parameter that is passed in to the modify
function by throwing an exception. That happens before the update is performed and a Replicator.ModifyFailure
is sent back as reply.
In the Update
message you can pass an optional request context, which the Replicator
does not care about, but is included in the reply messages. This is a convenient way to pass contextual information (e.g. original sender) without having to use ask
or maintain local correlation data structures.
- Scala
-
source
implicit val node = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val writeTwo = WriteTo(n = 2, timeout = 3.second) val Counter1Key = PNCounterKey("counter1") def receive: Receive = { case "increment" => // incoming command to increase the counter val upd = Update(Counter1Key, PNCounter(), writeTwo, request = Some(sender()))(_ :+ 1) replicator ! upd case UpdateSuccess(Counter1Key, Some(replyTo: ActorRef)) => replyTo ! "ack" case UpdateTimeout(Counter1Key, Some(replyTo: ActorRef)) => replyTo ! "nack" }
- Java
Get
For the full documentation of this feature and for new projects see Distributed Data - Get.
To retrieve the current value of a data you send Replicator.Get
message to the Replicator
. You supply a consistency level which has the following meaning:
- Scala
-
source
val replicator = DistributedData(system).replicator val Counter1Key = PNCounterKey("counter1") val Set1Key = GSetKey[String]("set1") val Set2Key = ORSetKey[String]("set2") val ActiveFlagKey = FlagKey("active") replicator ! Get(Counter1Key, ReadLocal) val readFrom3 = ReadFrom(n = 3, timeout = 1.second) replicator ! Get(Set1Key, readFrom3) val readMajority = ReadMajority(timeout = 5.seconds) replicator ! Get(Set2Key, readMajority) val readAll = ReadAll(timeout = 5.seconds) replicator ! Get(ActiveFlagKey, readAll)
- Java
As reply of the Get
a Replicator.GetSuccess
is sent to the sender of the Get
if the value was successfully retrieved according to the supplied read consistency level within the supplied timeout. Otherwise a Replicator.GetFailure
is sent. If the key does not exist the reply will be Replicator.NotFound
.
- Scala
-
source
case g @ GetSuccess(Counter1Key, req) => val value = g.get(Counter1Key).value case NotFound(Counter1Key, req) => // key counter1 does not exist
- Java
- Scala
-
source
case g @ GetSuccess(Set1Key, req) => val elements = g.get(Set1Key).elements case GetFailure(Set1Key, req) => // read from 3 nodes failed within 1.second case NotFound(Set1Key, req) => // key set1 does not exist
- Java
In the Get
message you can pass an optional request context in the same way as for the Update
message, described above. For example the original sender can be passed and replied to after receiving and transforming GetSuccess
.
- Scala
-
source
implicit val node = DistributedData(system).selfUniqueAddress val replicator = DistributedData(system).replicator val readTwo = ReadFrom(n = 2, timeout = 3.second) val Counter1Key = PNCounterKey("counter1") def receive: Receive = { case "get-count" => // incoming request to retrieve current value of the counter replicator ! Get(Counter1Key, readTwo, request = Some(sender())) case g @ GetSuccess(Counter1Key, Some(replyTo: ActorRef)) => val value = g.get(Counter1Key).value.longValue replyTo ! value case GetFailure(Counter1Key, Some(replyTo: ActorRef)) => replyTo ! -1L case NotFound(Counter1Key, Some(replyTo: ActorRef)) => replyTo ! 0L }
- Java
Subscribe
For the full documentation of this feature and for new projects see Distributed Data - Subscribe.
You may also register interest in change notifications by sending Replicator.Subscribe
message to the Replicator
. It will send Replicator.Changed
messages to the registered subscriber when the data for the subscribed key is updated. Subscribers will be notified periodically with the configured notify-subscribers-interval
, and it is also possible to send an explicit Replicator.FlushChanges
message to the Replicator
to notify the subscribers immediately.
The subscriber is automatically removed if the subscriber is terminated. A subscriber can also be deregistered with the Replicator.Unsubscribe
message.
- Scala
-
source
val replicator = DistributedData(system).replicator val Counter1Key = PNCounterKey("counter1") // subscribe to changes of the Counter1Key value replicator ! Subscribe(Counter1Key, self) var currentValue = BigInt(0) def receive: Receive = { case c @ Changed(Counter1Key) => currentValue = c.get(Counter1Key).value case "get-count" => // incoming request to retrieve current value of the counter sender() ! currentValue }
- Java
Consistency
For the full documentation of this feature and for new projects see Distributed Data Consistency.
Here is an example of using WriteMajority
and ReadMajority
:
- Scala
-
source
private val timeout = 3.seconds private val readMajority = ReadMajority(timeout) private val writeMajority = WriteMajority(timeout)
- Java
- Scala
-
source
def receiveGetCart: Receive = { case GetCart => replicator ! Get(DataKey, readMajority, Some(sender())) case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) => val data = g.get(DataKey) val cart = Cart(data.entries.values.toSet) replyTo ! cart case NotFound(DataKey, Some(replyTo: ActorRef)) => replyTo ! Cart(Set.empty) case GetFailure(DataKey, Some(replyTo: ActorRef)) => // ReadMajority failure, try again with local read replicator ! Get(DataKey, ReadLocal, Some(replyTo)) }
- Java
- Scala
-
source
def receiveAddItem: Receive = { case cmd @ AddItem(item) => val update = Update(DataKey, LWWMap.empty[String, LineItem], writeMajority, Some(cmd)) { cart => updateCart(cart, item) } replicator ! update }
- Java
In some rare cases, when performing an Update
it is needed to first try to fetch latest data from other nodes. That can be done by first sending a Get
with ReadMajority
and then continue with the Update
when the GetSuccess
, GetFailure
or NotFound
reply is received. This might be needed when you need to base a decision on latest information or when removing entries from an ORSet
or ORMap
. If an entry is added to an ORSet
or ORMap
from one node and removed from another node the entry will only be removed if the added entry is visible on the node where the removal is performed (hence the name observed-removed set).
The following example illustrates how to do that:
- Scala
-
source
def receiveRemoveItem: Receive = { case cmd @ RemoveItem(productId) => // Try to fetch latest from a majority of nodes first, since ORMap // remove must have seen the item to be able to remove it. replicator ! Get(DataKey, readMajority, Some(cmd)) case GetSuccess(DataKey, Some(RemoveItem(productId))) => replicator ! Update(DataKey, LWWMap(), writeMajority, None) { _.remove(node, productId) } case GetFailure(DataKey, Some(RemoveItem(productId))) => // ReadMajority failed, fall back to best effort local value replicator ! Update(DataKey, LWWMap(), writeMajority, None) { _.remove(node, productId) } case NotFound(DataKey, Some(RemoveItem(productId))) => // nothing to remove }
- Java
Caveat: Even if you use WriteMajority
and ReadMajority
there is small risk that you may read stale data if the cluster membership has changed between the Update
and the Get
. For example, in cluster of 5 nodes when you Update
and that change is written to 3 nodes: n1, n2, n3. Then 2 more nodes are added and a Get
request is reading from 4 nodes, which happens to be n4, n5, n6, n7, i.e. the value on n1, n2, n3 is not seen in the response of the Get
request.
Delete
For the full documentation of this feature and for new projects see Distributed Data - Delete.
- Scala
-
source
val replicator = DistributedData(system).replicator val Counter1Key = PNCounterKey("counter1") val Set2Key = ORSetKey[String]("set2") replicator ! Delete(Counter1Key, WriteLocal) val writeMajority = WriteMajority(timeout = 5.seconds) replicator ! Delete(Set2Key, writeMajority)
- Java
As deleted keys continue to be included in the stored data on each node as well as in gossip messages, a continuous series of updates and deletes of top-level entities will result in growing memory usage until an ActorSystem runs out of memory. To use Akka Distributed Data where frequent adds and removes are required, you should use a fixed number of top-level data types that support both updates and removals, for example ORMap
or ORSet
.
Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. For the full documentation of this feature and for new projects see Distributed Data Replicated data types.
Delta-CRDT
For the full documentation of this feature and for new projects see Distributed Data Delta CRDT.
Custom Data Type
You can implement your own data types. For the full documentation of this feature and for new projects see Distributed Data custom data type.
Durable Storage
For the full documentation of this feature and for new projects see Durable Storage.
Limitations
For the full documentation of this feature and for new projects see Limitations.
Learn More about CRDTs
- Strong Eventual Consistency and Conflict-free Replicated Data Types (video) talk by Mark Shapiro
- A comprehensive study of Convergent and Commutative Replicated Data Types paper by Mark Shapiro et. al.
Configuration
The DistributedData
extension can be configured with the following properties:
source# Settings for the DistributedData extension
akka.cluster.distributed-data {
# Actor name of the Replicator actor, /system/ddataReplicator
name = ddataReplicator
# Replicas are running on members tagged with this role.
# All members are used if undefined or empty.
role = ""
# How often the Replicator should send out gossip information
gossip-interval = 2 s
# How often the subscribers will be notified of changes, if any
notify-subscribers-interval = 500 ms
# Logging of data with payload size in bytes larger than
# this value. Maximum detected size per key is logged once,
# with an increase threshold of 10%.
# It can be disabled by setting the property to off.
log-data-size-exceeding = 10 KiB
# Maximum number of entries to transfer in one round of gossip exchange when
# synchronizing the replicas. Next chunk will be transferred in next round of gossip.
# The actual number of data entries in each Gossip message is dynamically
# adjusted to not exceed the maximum remote message size (maximum-frame-size).
max-delta-elements = 500
# The id of the dispatcher to use for Replicator actors.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = "akka.actor.internal-dispatcher"
# How often the Replicator checks for pruning of data associated with
# removed cluster nodes. If this is set to 'off' the pruning feature will
# be completely disabled.
pruning-interval = 120 s
# How long time it takes to spread the data to all other replica nodes.
# This is used when initiating and completing the pruning process of data associated
# with removed cluster nodes. The time measurement is stopped when any replica is
# unreachable, but it's still recommended to configure this with certain margin.
# It should be in the magnitude of minutes even though typical dissemination time
# is shorter (grows logarithmic with number of nodes). There is no advantage of
# setting this too low. Setting it to large value will delay the pruning process.
max-pruning-dissemination = 300 s
# The markers of that pruning has been performed for a removed node are kept for this
# time and thereafter removed. If and old data entry that was never pruned is somehow
# injected and merged with existing data after this time the value will not be correct.
# This would be possible (although unlikely) in the case of a long network partition.
# It should be in the magnitude of hours. For durable data it is configured by
# 'akka.cluster.distributed-data.durable.pruning-marker-time-to-live'.
pruning-marker-time-to-live = 6 h
# Serialized Write and Read messages are cached when they are sent to
# several nodes. If no further activity they are removed from the cache
# after this duration.
serializer-cache-time-to-live = 10s
# Update and Get operations are sent to oldest nodes first.
# This is useful together with Cluster Singleton, which is running on oldest nodes.
prefer-oldest = off
# Settings for delta-CRDT
delta-crdt {
# enable or disable delta-CRDT replication
enabled = on
# Some complex deltas grow in size for each update and above this
# threshold such deltas are discarded and sent as full state instead.
# This is number of elements or similar size hint, not size in bytes.
max-delta-size = 50
}
# Map of keys and inactivity duration for entries that will automatically be removed
# without tombstones when they have been inactive for the given duration.
# Prefix matching is supported by using * at the end of a key.
# Matching tombstones will also be removed after the expiry duration.
expire-keys-after-inactivity {
# Example syntax:
# "key-1" = 10 minutes
# "cache-*" = 2 minutes
}
durable {
# List of keys that are durable. Prefix matching is supported by using * at the
# end of a key.
keys = []
# The markers of that pruning has been performed for a removed node are kept for this
# time and thereafter removed. If and old data entry that was never pruned is
# injected and merged with existing data after this time the value will not be correct.
# This would be possible if replica with durable data didn't participate in the pruning
# (e.g. it was shutdown) and later started after this time. A durable replica should not
# be stopped for longer time than this duration and if it is joining again after this
# duration its data should first be manually removed (from the lmdb directory).
# It should be in the magnitude of days. Note that there is a corresponding setting
# for non-durable data: 'akka.cluster.distributed-data.pruning-marker-time-to-live'.
pruning-marker-time-to-live = 10 d
# Fully qualified class name of the durable store actor. It must be a subclass
# of akka.actor.Actor and handle the protocol defined in
# akka.cluster.ddata.DurableStore. The class must have a constructor with
# com.typesafe.config.Config parameter.
store-actor-class = akka.cluster.ddata.LmdbDurableStore
use-dispatcher = akka.cluster.distributed-data.durable.pinned-store
pinned-store {
executor = thread-pool-executor
type = PinnedDispatcher
}
# Config for the LmdbDurableStore
lmdb {
# Directory of LMDB file. There are two options:
# 1. A relative or absolute path to a directory that ends with 'ddata'
# the full name of the directory will contain name of the ActorSystem
# and its remote port.
# 2. Otherwise the path is used as is, as a relative or absolute path to
# a directory.
#
# When running in production you may want to configure this to a specific
# path (alt 2), since the default directory contains the remote port of the
# actor system to make the name unique. If using a dynamically assigned
# port (0) it will be different each time and the previously stored data
# will not be loaded.
dir = "ddata"
# Size in bytes of the memory mapped file.
map-size = 100 MiB
# Accumulate changes before storing improves performance with the
# risk of losing the last writes if the JVM crashes.
# The interval is by default set to 'off' to write each update immediately.
# Enabling write behind by specifying a duration, e.g. 200ms, is especially
# efficient when performing many writes to the same key, because it is only
# the last value for each key that will be serialized and stored.
# write-behind-interval = 200 ms
write-behind-interval = off
}
}
}