Distributed Data


To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project:

libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.6-SNAPSHOT"
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.6-SNAPSHOT'


Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types (CRDTs).

All data entries are spread to all nodes, or nodes with a certain role, in the cluster via direct replication and gossip based dissemination. You have fine grained control of the consistency level for reads and writes.

The nature CRDTs makes it possible to perform updates from any node without coordination. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge. Several useful data types for counters, sets, maps and registers are provided and you can also implement your own custom data types.

It is eventually consistent and geared toward providing high read and write availability (partition tolerance), with low latency. Note that in an eventually consistent system a read may return an out-of-date value.

Using the Replicator

The ReplicatorReplicator actor provides the API for interacting with the data and is accessed through the extension DistributedDataDistributedData.

The messages for the replicator, such as Replicator.Update are defined in ReplicatorReplicator but the actual CRDTs are the same as in untyped, for example akka.cluster.ddata.GCounter. This will require a implicit akka.cluster.ddata.SelfUniqueAddress.SelfUniqueAddress, available from implicit val node = DistributedData(system).selfUniqueAddressSelfUniqueAddress node = DistributedData.get(system).selfUniqueAddress();.

The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a key identifying the entry and helping us know what type it has, and then use that key for every interaction with the replicator. Each replicated data type contains a factory for defining such a key.

This sample uses the replicated data type GCounter to implement a counter that can be written to on any node of the cluster:

import akka.actor.typed.Scheduler
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.ddata.typed.scaladsl.Replicator._
import akka.cluster.ddata.{ GCounter, GCounterKey }
import akka.actor.testkit.typed.scaladsl._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.duration._

sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter], replyTo: ActorRef[Int])
    extends InternalMsg
private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg

def client(key: GCounterKey): Behavior[ClientCommand] =
  Behaviors.setup[ClientCommand] { ctx =>
    implicit val node: SelfUniqueAddress = DistributedData(ctx.system).selfUniqueAddress

    // adapter that turns the response messages from the replicator into our own protocol
    DistributedData.withReplicatorMessageAdapter[ClientCommand, GCounter] { replicatorAdapter =>
      replicatorAdapter.subscribe(key, InternalChanged.apply)

      def behavior(cachedValue: Int): Behavior[ClientCommand] = {
        Behaviors.receiveMessage[ClientCommand] {
          case Increment =>
              askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1),


          case GetValue(replyTo) =>
              askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo),
              value => InternalGetResponse(value, replyTo))


          case GetCachedValue(replyTo) =>
            replyTo ! cachedValue

          case internal: InternalMsg =>
            internal match {
              case InternalUpdateResponse(_) => Behaviors.same // ok

              case InternalGetResponse(rsp @ Replicator.GetSuccess(`key`), replyTo) =>
                val value = rsp.get(key).value.toInt
                replyTo ! value

              case InternalGetResponse(_, _) =>
                Behaviors.unhandled // not dealing with failures

              case InternalChanged(chg @ Replicator.Changed(`key`)) =>
                val value = chg.get(key).value.intValue

      behavior(cachedValue = 0)
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Receive;

import static org.junit.Assert.assertEquals;

interface ClientCommand {}

enum Increment implements ClientCommand {

static final class GetValue implements ClientCommand {
  final ActorRef<Integer> replyTo;

  GetValue(ActorRef<Integer> replyTo) {
    this.replyTo = replyTo;

static final class GetCachedValue implements ClientCommand {
  final ActorRef<Integer> replyTo;

  GetCachedValue(ActorRef<Integer> replyTo) {
    this.replyTo = replyTo;

private interface InternalMsg extends ClientCommand {}

private static final class InternalUpdateResponse implements InternalMsg {
  final Replicator.UpdateResponse<GCounter> rsp;

  InternalUpdateResponse(Replicator.UpdateResponse<GCounter> rsp) {
    this.rsp = rsp;

private static final class InternalGetResponse implements InternalMsg {
  final Replicator.GetResponse<GCounter> rsp;
  final ActorRef<Integer> replyTo;

  InternalGetResponse(Replicator.GetResponse<GCounter> rsp, ActorRef<Integer> replyTo) {
    this.rsp = rsp;
    this.replyTo = replyTo;

private static final class InternalChanged implements InternalMsg {
  final Replicator.Changed<GCounter> chg;

  InternalChanged(Replicator.Changed<GCounter> chg) {
    this.chg = chg;

static class Counter extends AbstractBehavior<ClientCommand> {
  private final ActorContext<ClientCommand> context;
  // adapter that turns the response messages from the replicator into our own protocol
  private final ReplicatorMessageAdapter<ClientCommand, GCounter> replicatorAdapter;
  private final SelfUniqueAddress node;
  private final Key<GCounter> key;

  private int cachedValue = 0;

      ActorContext<ClientCommand> ctx,
      ReplicatorMessageAdapter<ClientCommand, GCounter> replicatorAdapter,
      Key<GCounter> key) {

    context = ctx;
    this.replicatorAdapter = replicatorAdapter;
    this.key = key;

    node = DistributedData.get(ctx.getSystem()).selfUniqueAddress();

    this.replicatorAdapter.subscribe(this.key, InternalChanged::new);

  public static Behavior<ClientCommand> create(Key<GCounter> key) {
    return Behaviors.setup(
        ctx ->
                (ReplicatorMessageAdapter<ClientCommand, GCounter> replicatorAdapter) ->
                    new Counter(ctx, replicatorAdapter, key)));

  public Receive<ClientCommand> createReceive() {
    return newReceiveBuilder()
        .onMessage(Increment.class, this::onIncrement)
        .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same())
        .onMessage(GetValue.class, this::onGetValue)
        .onMessage(GetCachedValue.class, this::onGetCachedValue)
        .onMessage(InternalGetResponse.class, this::onInternalGetResponse)
        .onMessage(InternalChanged.class, this::onInternalChanged)

  private Behavior<ClientCommand> onIncrement(Increment cmd) {
        askReplyTo ->
            new Replicator.Update<>(
                curr -> curr.increment(node, 1)),

    return Behaviors.same();

  private Behavior<ClientCommand> onGetValue(GetValue cmd) {
        askReplyTo -> new Replicator.Get<>(key, Replicator.readLocal(), askReplyTo),
        rsp -> new InternalGetResponse(rsp, cmd.replyTo));

    return Behaviors.same();

  private Behavior<ClientCommand> onGetCachedValue(GetCachedValue cmd) {
    return Behaviors.same();

  private Behavior<ClientCommand> onInternalGetResponse(InternalGetResponse msg) {
    if (msg.rsp instanceof Replicator.GetSuccess) {
      int value = ((Replicator.GetSuccess<?>) msg.rsp).get(key).getValue().intValue();
      return Behaviors.same();
    } else {
      // not dealing with failures
      return Behaviors.unhandled();

  private Behavior<ClientCommand> onInternalChanged(InternalChanged msg) {
    GCounter counter = msg.chg.get(key);
    cachedValue = counter.getValue().intValue();
    return this;

Although you can interact with the Replicator using the ActorRef[Replicator.Command]ActorRef<Replicator.Command> from DistributedData(ctx.system).replicatorDistributedData(ctx.getSystem()).replicator() it’s often more convenient to use the ReplicatorMessageAdapter as in the above example.

When we start up the actor we subscribe it to changes for our key, meaning whenever the replicator observes a change for the counter our actor will receive a Replicator.Changed[GCounter]Replicator.Changed<GCounter>. Since this is not a message in our protocol, we use a message transformation function to wrap it in the internal InternalChanged message, which is then handled in the regular message handling of the behavior.

For an incoming Increment command, we send the replicator a Replicator.Update request, it contains five values:

  1. the KeyKEY we want to update
  2. the data to use if as the empty state if the replicator has not seen the key before
  3. the consistency level we want for the update
  4. an ActorRef[Replicator.UpdateResponse[GCounter]]ActorRef<Replicator.UpdateResponse<GCounter>> to respond to when the update is completed
  5. a function that takes a previous state and updates it, in our case by incrementing it with 1

Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without the extra interaction with the replicator using the GetCachedValue command.

The example also supports asking the replicator using the GetValue command. Note how the replyTo from the incoming message can be used when the GetSuccess response from the replicator is received.

See the the untyped Distributed Data documentation for more details about Get, Update and Delete interactions with the replicator.

There is alternative way of constructing the function for the Update message:

// alternative way to define the `createRequest` function
// Replicator.Update instance has a curried `apply` method
  Replicator.Update(key, GCounter.empty, Replicator.WriteLocal)(_ :+ 1),

// that is the same as
  askReplyTo => Replicator.Update(key, GCounter.empty, Replicator.WriteLocal, askReplyTo)(_ :+ 1),

Similar is supported for Get and Delete:

// alternative way to define the `createRequest` function
// Replicator.Get instance has a curried `apply` method
replicatorAdapter.askGet(Replicator.Get(key, Replicator.ReadLocal), value => InternalGetResponse(value, replyTo))

// that is the same as
  askReplyTo => Replicator.Get(key, Replicator.ReadLocal, askReplyTo),
  value => InternalGetResponse(value, replyTo))

Replicated data types

Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. For more details, read the untyped Distributed Data documentation

Running separate instances of the replicator

For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles, it makes sense to start separate replicators, this needs to be done on all nodes, or the group of nodes tagged with a specific role. To do this with the Typed Distributed Data you will first have to start an untyped Replicator and pass it to the Replicator.behavior method that takes an untyped actor ref. All such Replicators must run on the same path in the untyped actor hierarchy.

A standalone ReplicatorMessageAdapter can also be created for a given Replicator instead of creating one via the DistributedData extension.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.