Distributed Data

Dependency

To use Akka Cluster Distributed Data Typed, you must add the following dependency in your project:

sbt
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-typed" % "2.5.18"
Maven
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-cluster-typed_2.12</artifactId>
  <version>2.5.18</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.typesafe.akka', name: 'akka-cluster-typed_2.12', version: '2.5.18'
}

Introduction

Akka Distributed Data is useful when you need to share data between nodes in an Akka Cluster. The data is accessed with an actor providing a key-value store like API. The keys are unique identifiers with type information of the data values. The values are Conflict Free Replicated Data Types (CRDTs).

All data entries are spread to all nodes, or nodes with a certain role, in the cluster via direct replication and gossip based dissemination. You have fine grained control of the consistency level for reads and writes.

The nature CRDTs makes it possible to perform updates from any node without coordination. Concurrent updates from different nodes will automatically be resolved by the monotonic merge function, which all data types must provide. The state changes always converge. Several useful data types for counters, sets, maps and registers are provided and you can also implement your own custom data types.

It is eventually consistent and geared toward providing high read and write availability (partition tolerance), with low latency. Note that in an eventually consistent system a read may return an out-of-date value.

Using the Replicator

The ReplicatorReplicatorReplicatorReplicator actor provides the API for interacting with the data and is accessed through the extension DistributedDataDistributedDataDistributedDataDistributedData.

The messages for the replicator, such as Replicator.Update are defined in akka.cluster.ddata.typed.scaladsl.Replicator akka.cluster.ddata.typed.scaladsl.Replicator but the actual CRDTs are the same as in untyped, for example akka.cluster.ddata.GCounter. This will require an implicit untyped Cluster for now, we hope to improve this in the future (issue #25746).

The replicator can contain multiple entries each containing a replicated data type, we therefore need to create a key identifying the entry and helping us know what type it has, and then use that key for every interaction with the replicator. Each replicated data type contains a factory for defining such a key.

This sample uses the replicated data type GCounter to implement a counter that can be written to on any node of the cluster:

Scala
import akka.actor.Scheduler
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.Cluster
import akka.cluster.ddata.typed.scaladsl.Replicator._
import akka.cluster.ddata.{ GCounter, GCounterKey, ReplicatedData }
import akka.actor.testkit.typed.scaladsl._
import akka.util.Timeout
import com.typesafe.config.ConfigFactory

import scala.concurrent.Future
import scala.concurrent.duration._

sealed trait ClientCommand
final case object Increment extends ClientCommand
final case class GetValue(replyTo: ActorRef[Int]) extends ClientCommand
final case class GetCachedValue(replyTo: ActorRef[Int]) extends ClientCommand
private sealed trait InternalMsg extends ClientCommand
private case class InternalUpdateResponse(rsp: Replicator.UpdateResponse[GCounter]) extends InternalMsg
private case class InternalGetResponse(rsp: Replicator.GetResponse[GCounter]) extends InternalMsg
private case class InternalChanged(chg: Replicator.Changed[GCounter]) extends InternalMsg

val Key = GCounterKey("counter")

def client(replicator: ActorRef[Replicator.Command])(implicit cluster: Cluster): Behavior[ClientCommand] =
  Behaviors.setup[ClientCommand] { ctx ⇒

    // The distributed data types still need the implicit untyped Cluster.
    // We will look into another solution for that.

    val updateResponseAdapter: ActorRef[Replicator.UpdateResponse[GCounter]] =
      ctx.messageAdapter(InternalUpdateResponse.apply)

    val getResponseAdapter: ActorRef[Replicator.GetResponse[GCounter]] =
      ctx.messageAdapter(InternalGetResponse.apply)

    val changedAdapter: ActorRef[Replicator.Changed[GCounter]] =
      ctx.messageAdapter(InternalChanged.apply)

    replicator ! Replicator.Subscribe(Key, changedAdapter)

    def behavior(cachedValue: Int): Behavior[ClientCommand] = {
      Behaviors.receive[ClientCommand] { (ctx, msg) ⇒
        msg match {
          case Increment ⇒
            replicator ! Replicator.Update(Key, GCounter.empty, Replicator.WriteLocal, updateResponseAdapter)(_ + 1)
            Behaviors.same

          case GetValue(replyTo) ⇒
            replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo))
            Behaviors.same

          case GetCachedValue(replyTo) ⇒
            replicator ! Replicator.Get(Key, Replicator.ReadLocal, getResponseAdapter, Some(replyTo))
            Behaviors.same

          case internal: InternalMsg ⇒ internal match {
            case InternalUpdateResponse(_) ⇒ Behaviors.same // ok

            case InternalGetResponse(rsp @ Replicator.GetSuccess(Key, Some(replyTo: ActorRef[Int] @unchecked))) ⇒
              val value = rsp.get(Key).value.toInt
              replyTo ! value
              Behaviors.same

            case InternalGetResponse(rsp) ⇒
              Behaviors.unhandled // not dealing with failures

            case InternalChanged(chg @ Replicator.Changed(Key)) ⇒
              val value = chg.get(Key).value.intValue
              behavior(value)
          }
        }
      }
    }

    behavior(cachedValue = 0)
  }
Java
import java.util.Optional;
import akka.actor.typed.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ddata.GCounter;
import akka.cluster.ddata.GCounterKey;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ReplicatedData;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.javadsl.TestKit;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.cluster.ddata.typed.javadsl.Replicator.Command;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Receive;

  interface ClientCommand { }

  enum Increment implements ClientCommand {
    INSTANCE
  }

  static final class GetValue implements ClientCommand {
    final ActorRef<Integer> replyTo;

    GetValue(ActorRef<Integer> replyTo) {
      this.replyTo = replyTo;
    }
  }

  static final class GetCachedValue implements ClientCommand {
    final ActorRef<Integer> replyTo;

    GetCachedValue(ActorRef<Integer> replyTo) {
      this.replyTo = replyTo;
    }
  }

  private interface InternalMsg extends ClientCommand { }

  private static final class InternalUpdateResponse implements InternalMsg {
    final Replicator.UpdateResponse<GCounter> rsp;

    InternalUpdateResponse(Replicator.UpdateResponse<GCounter> rsp) {
      this.rsp = rsp;
    }
  }

  private static final class InternalGetResponse implements InternalMsg {
    final Replicator.GetResponse<GCounter> rsp;

    InternalGetResponse(Replicator.GetResponse<GCounter> rsp) {
      this.rsp = rsp;
    }
  }

  private static final class InternalChanged implements InternalMsg {
    final Replicator.Changed<GCounter> chg;

    InternalChanged(Replicator.Changed<GCounter> chg) {
      this.chg = chg;
    }
  }

  static final Key<GCounter> Key = GCounterKey.create("counter");

  static class Counter extends AbstractBehavior<ClientCommand> {
    private final ActorRef<Replicator.Command> replicator;
    private final Cluster node;
    final ActorRef<Replicator.UpdateResponse<GCounter>> updateResponseAdapter;
    final ActorRef<Replicator.GetResponse<GCounter>> getResponseAdapter;
    final ActorRef<Replicator.Changed<GCounter>> changedAdapter;

    private int cachedValue = 0;

    Counter(ActorRef<Command> replicator, Cluster node, ActorContext<ClientCommand> ctx) {
      this.replicator = replicator;
      this.node = node;

      // adapters turning the messages from the replicator
      // into our own protocol
      updateResponseAdapter = ctx.messageAdapter(
          (Class<Replicator.UpdateResponse<GCounter>>) (Object) Replicator.UpdateResponse.class,
          msg -> new InternalUpdateResponse(msg));

      getResponseAdapter = ctx.messageAdapter(
          (Class<Replicator.GetResponse<GCounter>>) (Object) Replicator.GetResponse.class,
          msg -> new InternalGetResponse(msg));

      changedAdapter = ctx.messageAdapter(
          (Class<Replicator.Changed<GCounter>>) (Object) Replicator.Changed.class,
          msg -> new InternalChanged(msg));

      replicator.tell(new Replicator.Subscribe<>(Key, changedAdapter));
    }

    public static Behavior<ClientCommand> create() {
      return Behaviors.setup((ctx) -> {
        // The distributed data types still need the implicit untyped Cluster.
        // We will look into another solution for that.
        Cluster node = Cluster.get(Adapter.toUntyped(ctx.getSystem()));
        ActorRef<Replicator.Command> replicator = DistributedData.get(ctx.getSystem()).replicator();

        return new Counter(replicator, node, ctx);
      });
    }


    @Override
    public Receive<ClientCommand> createReceive() {
      return receiveBuilder()
        .onMessage(Increment.class, this::onIncrement)
        .onMessage(InternalUpdateResponse.class, msg -> Behaviors.same())
        .onMessage(GetValue.class, this::onGetValue)
        .onMessage(GetCachedValue.class, this::onGetCachedValue)
        .onMessage(InternalGetResponse.class, this::onInternalGetResponse)
        .onMessage(InternalChanged.class, this::onInternalChanged)
        .build();
    }

    private Behavior<ClientCommand> onIncrement(Increment cmd) {
      replicator.tell(
        new Replicator.Update<>(
          Key,
          GCounter.empty(),
          Replicator.writeLocal(),
          updateResponseAdapter,
          curr -> curr.increment(node, 1)));
      return Behaviors.same();
    }

    private Behavior<ClientCommand> onGetValue(GetValue cmd) {
      replicator.tell(
        new Replicator.Get<>(Key, Replicator.readLocal(), getResponseAdapter, Optional.of(cmd.replyTo)));
      return Behaviors.same();
    }

    private Behavior<ClientCommand> onGetCachedValue(GetCachedValue cmd) {
      cmd.replyTo.tell(cachedValue);
      return Behaviors.same();
    }

    private Behavior<ClientCommand> onInternalGetResponse(InternalGetResponse msg) {
      if (msg.rsp instanceof Replicator.GetSuccess) {
        int value = ((Replicator.GetSuccess<?>) msg.rsp).get(Key).getValue().intValue();
        ActorRef<Integer> replyTo = (ActorRef<Integer>) msg.rsp.request().get();
        replyTo.tell(value);
        return Behaviors.same();
      } else {
        // not dealing with failures
        return Behaviors.unhandled();
      }
    }

    private Behavior<ClientCommand> onInternalChanged(InternalChanged msg) {
      GCounter counter = msg.chg.get(Key);
      cachedValue = counter.getValue().intValue();
      return this;
    }

}

When we start up the actor we subscribe it to changes for our key, this means that whenever the replicator see a change for the counter our actor will get a Replicator.Changed[GCounter]Replicator.Changed<GCounter>, since this is not a message in our protocol, we use an adapter to wrap it in the internal InternalChanged message, which is then handled in the regular message handling of the behavior.

For an incoming Increment command, we send the replicator a Replicator.Update request, it contains five values:

  1. the KeyKEY we want to update
  2. the data to use if as the empty state if the replicator has not seen the key before
  3. the consistency level we want for the update
  4. an ActorRef[Replicator.UpdateResponse[GCounter]]ActorRef<Replicator.UpdateResponse<GCounter>> to respond to when the update is completed
  5. a function that takes a previous state and updates it, in our case by incrementing it with 1

Whenever the distributed counter is updated, we cache the value so that we can answer requests about the value without the extra interaction with the replicator using the GetCachedValue command.

We also support asking the replicator, using the GetValue, demonstrating how many of the replicator commands take a pass-along value that will be put in the response message so that we do not need to keep a local state tracking what actors are waiting for responses, but can extract the replyTo actor from the replicator when it responds with a GetSuccess. See the the untyped Distributed Data documentation for more details about what interactions with the replicator there are.

Replicated data types

Akka contains a set of useful replicated data types and it is fully possible to implement custom replicated data types. For more details, read the untyped Distributed Data documentation

Running separate instances of the replicator

For some use cases, for example when limiting the replicator to certain roles, or using different subsets on different roles, it makes sense to start separate replicators, this needs to be done on all nodes, or the group of nodes tagged with a specific role. To do this with the Typed Distributed Data you will first have to start an untyped Replicator and pass it to the Replicator.behavior method that takes an untyped actor ref. All such Replicators must run on the same path in the untyped actor hierarchy.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.