Replicated Event Sourcing replication via direct access to replica databases

Note

Since Akka 2.8.0 a gRPC based transport is the recommended way to set up the replication of events between the replicas.

It is possible to consume events with a direct connection to the database backing each replica. Such a setup is generally harder to set up and secure, and is less feasible unless the replication is over a private network.

To enable an entity for Replicated Event Sourcing let it extend ReplicatedEventSourcedBehavior instead of EventSourcedBehavior and use the factory methods on akka.persistence.typed.scaladsl.ReplicatedEventSourcingakka.persistence.typed.javadsl.ReplicatedEventSourcing.

All replicas need to be known up front:

Scala
sourceval DCA = ReplicaId("DC-A")
val DCB = ReplicaId("DC-B")
val AllReplicas = Set(DCA, DCB)
Java
sourcepublic static final ReplicaId DCA = new ReplicaId("DCA");
public static final ReplicaId DCB = new ReplicaId("DCB");

public static final Set<ReplicaId> ALL_REPLICAS =
    Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB)));

Then to enable replication create the event sourced behavior with the factory method:

Scala
sourcedef apply(
    system: ActorSystem[_],
    entityId: String,
    replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = {
  ReplicatedEventSourcing.perReplicaJournalConfig(
    ReplicationId("MyReplicatedEntity", entityId, replicaId),
    Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { replicationContext =>
    EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
  }
}
Java
sourcepublic class MyReplicatedBehavior
    extends ReplicatedEventSourcedBehavior<
        MyReplicatedBehavior.Command, MyReplicatedBehavior.Event, MyReplicatedBehavior.State> {
  public static Behavior<Command> create(String entityId, ReplicaId replicaId) {
    Map<ReplicaId, String> allReplicasAndQueryPlugins = new HashMap<>();
    allReplicasAndQueryPlugins.put(DCA, "journalForDCA");
    allReplicasAndQueryPlugins.put(DCB, "journalForDCB");

    return ReplicatedEventSourcing.perReplicaJournalConfig(
        new ReplicationId("MyReplicatedEntity", entityId, replicaId),
        allReplicasAndQueryPlugins,
        MyReplicatedBehavior::new);
  }

  private MyReplicatedBehavior(ReplicationContext replicationContext) {
    super(replicationContext);
  }

The factory takes in:

  • entityId: this will be used as part of the underlying persistenceId
  • replicaId: Which replica this instance is
  • allReplicasAndQueryPlugins: All Replicas and the query plugin used to read their events
  • A factory function to create an instance of the EventSourcedBehaviorReplicatedEventSourcedBehavior

In this scenario each replica reads from each other’s database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the withSharedJournal factory can be used.

Scala
sourcedef apply(
    system: ActorSystem[_],
    entityId: String,
    replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = {
  ReplicatedEventSourcing.commonJournalConfig(
    ReplicationId("MyReplicatedEntity", entityId, replicaId),
    AllReplicas,
    queryPluginId) { replicationContext =>
    EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
  }
}
Java
sourcepublic static Behavior<Command> create(
    String entityId, ReplicaId replicaId, String queryPluginId) {
  return ReplicatedEventSourcing.commonJournalConfig(
      new ReplicationId("MyReplicatedEntity", entityId, replicaId),
      ALL_REPLICAS,
      queryPluginId,
      MyReplicatedBehavior::new);
}

The function passed to both factory methods return an EventSourcedBehavior and provide access to the ReplicationContextReplicationContext that has the following methods:

  • entityId
  • replicaId
  • allReplicas
  • persistenceId - to provide to the EventSourcedBehavior factory. This must be used.

As well as methods that can only be used in the event handler. The values these methods return relate to the event that is being processed.

The function passed to both factory methods is invoked with a special ReplicationContextReplicationContext that needs to be passed to the concrete ReplicatedEventSourcedBehavior and on to the super constructor.

The context gives access to:

  • entityId
  • replicaId
  • allReplicas
  • persistenceId

As well as methods that can only be used in the event handler, accessed through getReplicationContext. The values these methods return relate to the event that is being processed.

  • origin: The ReplicaId that originally created the event
  • concurrent: Whether the event was concurrent with another event as in the second diagram above
  • recoveryRunning: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once.
  • currentTimeMillis: similar to System.currentTimeMillis but guaranteed never to go backwards

The factory returns a Behavior that can be spawned like any other behavior.

Sharded Replicated Event Sourced entities

There are three ways to integrate replicated event sourced entities with sharding:

  • Ensure that each replica has a unique entity id by using the replica id as part of the entity id
  • Use multi datacenter to run a full copy of sharding per replica
  • Use roles to run a full copy of sharding per replica

To simplify all three cases the ReplicatedShardingExtensionReplicatedShardingExtension is available from the akka-cluster-sharding-typed module.

Scala
sourceReplicatedEntityProvider[Command]("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) {
  (entityTypeKey, replicaId) =>
    ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext =>
      // the sharding entity id contains the business entityId, entityType, and replica id
      // which you'll need to create a ReplicatedEventSourcedBehavior
      val replicationId = ReplicationId.fromString(entityContext.entityId)
      MyEventSourcedBehavior(replicationId)
    })
}
Java
sourcereturn ReplicatedEntityProvider.create(
    Command.class,
    "MyReplicatedType",
    ALL_REPLICAS,
    (entityTypeKey, replicaId) ->
        ReplicatedEntity.create(
            replicaId,
            Entity.of(
                entityTypeKey,
                entityContext ->
                    myEventSourcedBehavior(
                        ReplicationId.fromString(entityContext.getEntityId())))));

This will run an instance of sharding and per replica and each entity id contains the replica id and the type name. Replicas could be on the same node if they end up in the same shard or if the shards get allocated to the same node.

To prevent this roles can be used. You could for instance add a cluster role per availability zone / rack and have a replica per rack.

Scala
sourceval provider = ReplicatedEntityProvider.perRole("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) {
  replicationId =>
    MyEventSourcedBehavior(replicationId)
}
Java
sourcereturn ReplicatedEntityProvider.create(
    Command.class,
    "MyReplicatedType",
    ALL_REPLICAS,
    (entityTypeKey, replicaId) ->
        ReplicatedEntity.create(
            replicaId,
            Entity.of(
                    entityTypeKey,
                    entityContext ->
                        myEventSourcedBehavior(
                            ReplicationId.fromString(entityContext.getEntityId())))
                .withRole(replicaId.id())));

Lastly if your Akka Cluster is setup across DCs you can run a replica per DC.

Scala
sourceReplicatedEntityProvider.perDataCenter("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { replicationId =>
  MyEventSourcedBehavior(replicationId)
}
Java
sourceReplicatedEntityProvider.create(
    Command.class,
    "MyReplicatedType",
    ALL_REPLICAS,
    (entityTypeKey, replicaId) ->
        ReplicatedEntity.create(
            replicaId,
            Entity.of(
                    entityTypeKey,
                    entityContext ->
                        myEventSourcedBehavior(
                            ReplicationId.fromString(entityContext.getEntityId())))
                .withDataCenter(replicaId.id())));

Regardless of which replication strategy you use sending messages to the replicated entities is the same.

init returns an ReplicatedShardingReplicatedSharding instance which gives access to EntityRefEntityRefs for each of the replicas for arbitrary routing logic:

Scala
sourceval myReplicatedSharding: ReplicatedSharding[Command] =
  ReplicatedShardingExtension(system).init(provider)

val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId")
Java
sourceReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system);

ReplicatedSharding<Command> replicatedSharding = extension.init(provider());

Map<ReplicaId, EntityRef<Command>> myEntityId =
    replicatedSharding.getEntityRefsFor("myEntityId");

More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release #29281, #29319).

Tagging events and running projections

Just like for regular EventSourcedBehaviors it is possible to tag events along with persisting them. This is useful for later retrival of events for a given tag. The same API for tagging provided for EventSourcedBehavior can be used for replicated event sourced behaviors as well. Tagging is useful in practice to build queries that lead to other data representations or aggregations of these event streams that can more directly serve user queries – known as building the “read side” in CQRS based applications.

Creating read side projections is possible through Akka Projections or through direct usage of the events by tag queries.

The tagging is invoked in each replicas, which requires some special care in using tags, or else the same event will be tagged one time for each replica and show up in the event by tag stream one time for each replica. In addition to this the tags will be written in the respective journal of the replicas, which means that unless they all share a single journal the tag streams will be local to the replica even if the same tag is used on multiple replicas.

One strategy for dealing with this is to include the replica id in the tag name, this means there will be a tagged stream of events per replica that contains all replicated events, but since the events can arrive in different order, they can also come in different order per replica tag.

Another strategy would be to tag only the events that are local to the replica and not events that are replicated. Either using a tag that will be the same for all replicas, leading to a single stream of tagged events where the events from each replica is present only once, or with a tag including the replica id meaning that there will be a stream of tagged events with the events accepted locally for each replica.

Determining the replica id of the replicated actor itself and the origin replica id of an event is possible through the ReplicationContextReplicationContext when the tagger callback is invoked like this:

Scala
sourceReplicatedEventSourcing.commonJournalConfig(
  ReplicationId("TaggingSpec", entityId, replica),
  allReplicas,
  queryPluginId)(
  replicationContext =>
    EventSourcedBehavior[Command, String, State](
      replicationContext.persistenceId,
      State(Set.empty),
      (state, command) =>
        command match {
          case Add(string, ack) =>
            if (state.strings.contains(string)) Effect.none.thenRun(_ => ack ! Done)
            else Effect.persist(string).thenRun(_ => ack ! Done)
          case GetStrings(replyTo) =>
            replyTo ! state.strings
            Effect.none
        },
      (state, event) => state.copy(strings = state.strings + event))
    // use withTagger to define tagging logic
      .withTagger(
        event =>
          // don't apply tags if event was replicated here, it already will appear in queries by tag
          // as the origin replica would have tagged it already
          if (replicationContext.origin != replicationContext.replicaId) Set.empty
          else if (event.length > 10) Set("long-strings", "strings")
          else Set("strings")))
Java
source@Override
public Set<String> tagsFor(String event) {
  // don't apply tags if event was replicated here, it already will appear in queries by tag
  // as the origin replica would have tagged it already
  if (getReplicationContext().replicaId() != getReplicationContext().origin()) {
    return new HashSet<>();
  } else {
    Set<String> tags = new HashSet<>();
    tags.add("strings");
    if (event.length() > 10) tags.add("long-strings");
    return tags;
  }
}

In this sample we are using a shared journal, and single tag but only tagging local events to it and therefore ending up with a single stream of tagged events from all replicas without duplicates.

Direct Replication of Events

Each replica will read the events from all the other copies from the database. When used with Cluster Sharding, and to make the sharing of events with other replicas more efficient, each replica publishes the events across the Akka cluster directly to other replicas. The delivery of events across the cluster is not guaranteed so the query to the journal is still needed but can be configured to poll the database less often since most events will arrive at the replicas through the cluster.

The direct replication of events feature is enabled by default when using Cluster Sharding. To disable this feature you first need to:

  1. disable event publishing on the EventSourcedBehavior with withEventPublishing(false)overriding withEventPublishing from ReplicatedEventSourcedBehavior to return false , and then
  2. disable direct replication through withDirectReplication(false) on ReplicatedEntityProviderReplicatedEntityProvider

The “event publishing” feature publishes each event to the local system event bus as a side effect after it has been written.

Examples

More examples can be found in Replicated Event Sourcing Examples

Journal Support

For a journal plugin to support replication it needs to store and read metadata for each event if it is defined in the metadata field. To attach the metadata after writing it, PersistentRepr.withMetadata is used. The JournalSpecJournalSpec in the Persistence TCK provides a capability flag supportsMetadata to toggle verification that metadata is handled correctly.

For a snapshot plugin to support replication it needs to store and read metadata for the snapshot if it is defined in the metadata field. To attach the metadata when reading the snapshot the akka.persistence.SnapshotMetadata.apply factory overload taking a metadata parameter is used. The SnapshotStoreSpecSnapshotStoreSpec in the Persistence TCK provides a capability flag supportsMetadata to toggle verification that metadata is handled correctly.

The following plugins support Replicated Event Sourcing:

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.