Distributed Data
Dependency
To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project:
Introduction
This module is ready to be used in production, but it is still marked as may change. This means that API or semantics can change without warning or deprecation period, but such changes will be collected and be performed in Akka 2.6.0 rather than in 2.5.x patch releases.
Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types (CRDTs).
All data entries are spread to all nodes, or nodes with a certain role, in the cluster via direct replication and gossip based dissemination. You have fine grained control of the consistency level for reads and writes.
The nature CRDTs makes it possible to perform updates from any node without coordination. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge. Several useful data types for counters, sets, maps and registers are provided and you can also implement your own custom data types.
It is eventually consistent and geared toward providing high read and write availability (partition tolerance), with low latency. Note that in an eventually consistent system a read may return an out-of-date value.
Using the Replicator
The akka.cluster.ddata.typed.scaladsl.Replicator actor provides the API for interacting with the data and is accessed through the extension akka.cluster.ddata.typed.scaladsl.DistributedData .
The messages for the replicator, such as Replicator.Update
are defined in akka.cluster.ddata.typed.scaladsl.Replicator
but the actual CRDTs are the same as in untyped, for example akka.cluster.ddata.GCounter
. This will require a implicit akka.cluster.ddata.SelfUniqueAddress.SelfUniqueAddress
, available from implicit val node = DistributedData(system).selfUniqueAddress
.
The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a key identifying the entry and helping us know what type it has, and then use that key for every interaction with the replicator. Each replicated data type contains a factory for defining such a key.
This sample uses the replicated data type GCounter
to implement a counter that can be written to on any node of the cluster:
- Scala
-
source
import akka.actor.Scheduler import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.cluster.ddata.typed.scaladsl.Replicator._ import akka.cluster.ddata.{ GCounter, GCounterKey } import akka.actor.testkit.typed.scaladsl._ import akka.util.Timeout import com.typesafe.config.ConfigFactory import scala.concurrent.Future import scala.concurrent.duration._ sealed trait ClientCommand final case object Increment extends ClientCommand final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand private sealed trait InternalMsg extends ClientCommand private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter]) extends InternalMsg private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg val Key = GCounterKey("counter") def client(replicator: ActorRef[Replicator.Command])(implicit node: SelfUniqueAddress): Behavior[ClientCommand] = Behaviors.setup[ClientCommand] { ctx => val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] = ctx.messageAdapter(InternalUpdateResponse.apply) val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] = ctx.messageAdapter(InternalGetResponse.apply) val changedAdapter: ActorRef[Replicator.Changed[GCounter]] = ctx.messageAdapter(InternalChanged.apply) replicator ! Replicator.Subscribe(Key, changedAdapter) def behavior(cachedValue: Int): Behavior[ClientCommand] = { Behaviors.receiveMessage[ClientCommand] { case Increment => replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ :+ 1) Behaviors.same case GetValue(replyTo) => replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo)) Behaviors.same case GetCachedValue(replyTo) => replyTo ! cachedValue Behaviors.same case internal: InternalMsg => internal match { case InternalUpdateResponse(_) => Behaviors.same // ok case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked))) => val value = rsp.get(Key).value.toInt replyTo ! value Behaviors.same case InternalGetResponse(_) => Behaviors.unhandled // not dealing with failures case InternalChanged(chg @ Replicator.Changed(Key)) => val value = chg.get(Key).value.intValue behavior(value) } } } behavior(cachedValue = 0) }
- Java
When we start up the actor we subscribe it to changes for our key, this means that whenever the replicator see a change for the counter our actor will get a Replicator.Changed[GCounter]
, since this is not a message in our protocol, we use an adapter to wrap it in the internal InternalChanged
message, which is then handled in the regular message handling of the behavior.
For an incoming Increment
command, we send the replicator
a Replicator.Update
request, it contains five values:
- the
Key
we want to update - the data to use if as the empty state if the replicator has not seen the key before
- the consistency level we want for the update
- an
ActorRef[Replicator.UpdateResponse[GCounter]]
to respond to when the update is completed - a function that takes a previous state and updates it, in our case by incrementing it with 1
Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without the extra interaction with the replicator using the GetCachedValue
command.
We also support asking the replicator, using the GetValue
, demonstrating how many of the replicator commands take a pass-along value that will be put in the response message so that we do not need to keep a local state tracking what actors are waiting for responses, but can extract the replyTo
actor from the replicator when it responds with a GetSuccess
. See the the untyped Distributed Data documentation for more details about what interactions with the replicator there are.
Replicated data types
Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. For more details, read the untyped Distributed Data documentation
Running separate instances of the replicator
For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles, it makes sense to start separate replicators, this needs to be done on all nodes, or the group of nodes tagged with a specific role. To do this with the Typed Distributed Data you will first have to start an untyped Replicator
and pass it to the Replicator.behavior
method that takes an untyped actor ref. All such Replicator
s must run on the same path in the untyped actor hierarchy.