Replicated Event Sourcing replication via direct access to replica databases
Since Akka 2.8.0 a gRPC based transport is the recommended way to set up the replication of events between the replicas.
It is possible to consume events with a direct connection to the database backing each replica. Such a setup is generally harder to set up and secure, and is less feasible unless the replication is over a private network.
To enable an entity for Replicated Event Sourcing let it extend ReplicatedEventSourcedBehavior
instead of EventSourcedBehavior
and use the factory methods on akka.persistence.typed.scaladsl.ReplicatedEventSourcing
akka.persistence.typed.javadsl.ReplicatedEventSourcing
.
All replicas need to be known up front:
- Scala
-
source
val DCA = ReplicaId("DC-A") val DCB = ReplicaId("DC-B") val AllReplicas = Set(DCA, DCB)
- Java
-
source
public static final ReplicaId DCA = new ReplicaId("DCA"); public static final ReplicaId DCB = new ReplicaId("DCB"); public static final Set<ReplicaId> ALL_REPLICAS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(DCA, DCB)));
Then to enable replication create the event sourced behavior with the factory method:
- Scala
-
source
def apply( system: ActorSystem[_], entityId: String, replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = { ReplicatedEventSourcing.perReplicaJournalConfig( ReplicationId("MyReplicatedEntity", entityId, replicaId), Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { replicationContext => EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) } }
- Java
-
source
public class MyReplicatedBehavior extends ReplicatedEventSourcedBehavior< MyReplicatedBehavior.Command, MyReplicatedBehavior.Event, MyReplicatedBehavior.State> { public static Behavior<Command> create(String entityId, ReplicaId replicaId) { Map<ReplicaId, String> allReplicasAndQueryPlugins = new HashMap<>(); allReplicasAndQueryPlugins.put(DCA, "journalForDCA"); allReplicasAndQueryPlugins.put(DCB, "journalForDCB"); return ReplicatedEventSourcing.perReplicaJournalConfig( new ReplicationId("MyReplicatedEntity", entityId, replicaId), allReplicasAndQueryPlugins, MyReplicatedBehavior::new); } private MyReplicatedBehavior(ReplicationContext replicationContext) { super(replicationContext); }
The factory takes in:
entityId
: this will be used as part of the underlying persistenceIdreplicaId
: Which replica this instance isallReplicasAndQueryPlugins
: All Replicas and the query plugin used to read their events- A factory function to create an instance of the
EventSourcedBehavior
ReplicatedEventSourcedBehavior
In this scenario each replica reads from each other’s database effectively providing cross region replication for any database that has an Akka Persistence plugin. Alternatively if all the replicas use the same journal, e.g. for testing or if it is a distributed database such as Cassandra, the withSharedJournal
factory can be used.
- Scala
-
source
def apply( system: ActorSystem[_], entityId: String, replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = { ReplicatedEventSourcing.commonJournalConfig( ReplicationId("MyReplicatedEntity", entityId, replicaId), AllReplicas, queryPluginId) { replicationContext => EventSourcedBehavior[Command, State, Event](???, ???, ???, ???) } }
- Java
-
source
public static Behavior<Command> create( String entityId, ReplicaId replicaId, String queryPluginId) { return ReplicatedEventSourcing.commonJournalConfig( new ReplicationId("MyReplicatedEntity", entityId, replicaId), ALL_REPLICAS, queryPluginId, MyReplicatedBehavior::new); }
The function passed to both factory methods return an EventSourcedBehavior
and provide access to the ReplicationContext
ReplicationContext
that has the following methods:
entityId
replicaId
allReplicas
persistenceId
- to provide to theEventSourcedBehavior
factory. This must be used.
As well as methods that can only be used in the event handler. The values these methods return relate to the event that is being processed.
The function passed to both factory methods is invoked with a special ReplicationContext
ReplicationContext
that needs to be passed to the concrete ReplicatedEventSourcedBehavior
and on to the super constructor.
The context gives access to:
entityId
replicaId
allReplicas
persistenceId
As well as methods that can only be used in the event handler, accessed through getReplicationContext
. The values these methods return relate to the event that is being processed.
origin
: The ReplicaId that originally created the eventconcurrent
: Whether the event was concurrent with another event as in the second diagram aboverecoveryRunning
: Whether a recovery is running. Can be used to send commands back to self for side effects that should only happen once.currentTimeMillis
: similar toSystem.currentTimeMillis
but guaranteed never to go backwards
The factory returns a Behavior
that can be spawned like any other behavior.
Sharded Replicated Event Sourced entities
There are three ways to integrate replicated event sourced entities with sharding:
- Ensure that each replica has a unique entity id by using the replica id as part of the entity id
- Use multi datacenter to run a full copy of sharding per replica
- Use roles to run a full copy of sharding per replica
To simplify all three cases the ReplicatedShardingExtension
ReplicatedShardingExtension
is available from the akka-cluster-sharding-typed
module.
- Scala
-
source
ReplicatedEntityProvider[Command]("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { (entityTypeKey, replicaId) => ReplicatedEntity(replicaId, Entity(entityTypeKey) { entityContext => // the sharding entity id contains the business entityId, entityType, and replica id // which you'll need to create a ReplicatedEventSourcedBehavior val replicationId = ReplicationId.fromString(entityContext.entityId) MyEventSourcedBehavior(replicationId) }) }
- Java
-
source
return ReplicatedEntityProvider.create( Command.class, "MyReplicatedType", ALL_REPLICAS, (entityTypeKey, replicaId) -> ReplicatedEntity.create( replicaId, Entity.of( entityTypeKey, entityContext -> myEventSourcedBehavior( ReplicationId.fromString(entityContext.getEntityId())))));
This will run an instance of sharding and per replica and each entity id contains the replica id and the type name. Replicas could be on the same node if they end up in the same shard or if the shards get allocated to the same node.
To prevent this roles can be used. You could for instance add a cluster role per availability zone / rack and have a replica per rack.
- Scala
-
source
val provider = ReplicatedEntityProvider.perRole("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { replicationId => MyEventSourcedBehavior(replicationId) }
- Java
-
source
return ReplicatedEntityProvider.create( Command.class, "MyReplicatedType", ALL_REPLICAS, (entityTypeKey, replicaId) -> ReplicatedEntity.create( replicaId, Entity.of( entityTypeKey, entityContext -> myEventSourcedBehavior( ReplicationId.fromString(entityContext.getEntityId()))) .withRole(replicaId.id())));
Lastly if your Akka Cluster is setup across DCs you can run a replica per DC.
- Scala
-
source
ReplicatedEntityProvider.perDataCenter("MyEntityType", Set(ReplicaId("DC-A"), ReplicaId("DC-B"))) { replicationId => MyEventSourcedBehavior(replicationId) }
- Java
-
source
ReplicatedEntityProvider.create( Command.class, "MyReplicatedType", ALL_REPLICAS, (entityTypeKey, replicaId) -> ReplicatedEntity.create( replicaId, Entity.of( entityTypeKey, entityContext -> myEventSourcedBehavior( ReplicationId.fromString(entityContext.getEntityId()))) .withDataCenter(replicaId.id())));
Regardless of which replication strategy you use sending messages to the replicated entities is the same.
init
returns an ReplicatedSharding
ReplicatedSharding
instance which gives access to EntityRef
EntityRef
s for each of the replicas for arbitrary routing logic:
- Scala
-
source
val myReplicatedSharding: ReplicatedSharding[Command] = ReplicatedShardingExtension(system).init(provider) val entityRefs: Map[ReplicaId, EntityRef[Command]] = myReplicatedSharding.entityRefsFor("myEntityId")
- Java
-
source
ReplicatedShardingExtension extension = ReplicatedShardingExtension.get(system); ReplicatedSharding<Command> replicatedSharding = extension.init(provider()); Map<ReplicaId, EntityRef<Command>> myEntityId = replicatedSharding.getEntityRefsFor("myEntityId");
More advanced routing among the replicas is currently left as an exercise for the reader (or may be covered in a future release #29281, #29319).
Direct Replication of Events
Each replica will read the events from all the other copies from the database. When used with Cluster Sharding, and to make the sharing of events with other replicas more efficient, each replica publishes the events across the Akka cluster directly to other replicas. The delivery of events across the cluster is not guaranteed so the query to the journal is still needed but can be configured to poll the database less often since most events will arrive at the replicas through the cluster.
The direct replication of events feature is enabled by default when using Cluster Sharding. To disable this feature you first need to:
- disable event publishing on the
EventSourcedBehavior
withwithEventPublishing(false)
overridingwithEventPublishing
fromReplicatedEventSourcedBehavior
to returnfalse
, and then - disable direct replication through
withDirectReplication(false)
onReplicatedEntityProvider
ReplicatedEntityProvider
The “event publishing” feature publishes each event to the local system event bus as a side effect after it has been written.
Hot Standby
If all writes occur to one replica and the other replicas are not started there might be many replicated events to catch up with when they are later started. Therefore it can be good to activate all replicas when there is some activity.
This can be achieved automatically when direct access to replica databases and ReplicatedSharding
is used and direct replication of events is enabled as described in Direct Replication of Events. When each written event is forwarded to the other replicas it will trigger them to start if they are not already started.
Examples
More examples can be found in Replicated Event Sourcing Examples
Journal Support
For a journal plugin to support replication it needs to store and read metadata for each event if it is defined in the metadata
field. To attach the metadata after writing it, PersistentRepr.withMetadata
is used. The JournalSpec
JournalSpec
in the Persistence TCK provides a capability flag supportsMetadata
to toggle verification that metadata is handled correctly.
For a snapshot plugin to support replication it needs to store and read metadata for the snapshot if it is defined in the metadata
field. To attach the metadata when reading the snapshot the akka.persistence.SnapshotMetadata.apply
factory overload taking a metadata
parameter is used. The SnapshotStoreSpec
SnapshotStoreSpec
in the Persistence TCK provides a capability flag supportsMetadata
to toggle verification that metadata is handled correctly.
The following plugins support Replicated Event Sourcing:
- Akka Persistence Cassandra versions 1.0.3+
- Akka Persistence R2DBC versions 1.0.0+
- Akka Persistence JDBC versions 5.0.0+