Akka Replicated Event Sourcing over gRPC
Akka Replicated Event Sourcing extends Akka Persistence allowing multiple replicas of the same entity, all accepting writes, for example in different data centers or cloud provider regions. This makes it possible to implement patterns such as active-active and hot standby.
Originally, Akka Replicated Event Sourcing has required cross-replica access to the underlying replica database, which can be hard to open up for security and infrastructure reasons. It was also easiest to use in an Akka Multi DC Cluster setup where a single cluster spans multiple datacenters or regions, another thing that can be complicated to allow.
Akka Replicated Event Sourcing over gRPC builds on Akka Projection gRPC and Akka gRPC to instead use gRPC as the cross-replica transport for events.
There are no requirements that the replicas are sharing a cluster, instead it is expected that each replica is a separate Akka cluster with the gRPC replication transport as only connection in between.
This module is currently marked as May Change in the sense that the API might be changed based on feedback from initial usage. However, the module is ready for usage in production and we will not break serialization format of messages or stored data.
Overview
For a basic overview of Replicated Event Sourcing see the Akka Replicated Event Sourcing docs
Akka Replicated Event Sourcing over gRPC consists of the following three parts:
-
The Replicated Event Sourced Behavior is run in each replica as a sharded entity using Akka Cluster Sharding.
-
The events of each replica are published to the other replicas using Akka Projection gRPC endpoints.
-
Each replica consumes a number of parallel slices of the events from each other replica by running Akka Projection gRPC in Akka Sharded Daemon Process.
Dependencies
The functionality is provided through the akka-projection-grpc
module.
Project Info: Akka Projections gRPC | |
---|---|
Artifact | com.lightbend.akka
akka-projection-grpc
1.4.0
|
JDK versions | AdoptOpenJDK 8 AdoptOpenJDK 11 |
Scala versions | 2.13.10, 2.12.17, 3.2.2 |
JPMS module name | akka.projection.grpc |
License | |
Readiness level | Supported, Lightbend Subscription provides support
Since 1.3.0, 2020-10-23
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
To use the gRPC module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-grpc" % "1.4.0"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-grpc_${scala.binary.version}</artifactId> <version>1.4.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-grpc_${versions.ScalaBinary}:1.4.0" }
Akka Replicated Event Sourcing over gRPC requires Akka 2.8.0 or later and can only be run in an Akka cluster since it uses cluster components.
It is currently only possible to use akka-projection-r2dbc as the projection storage and journal for this module.
The full set of dependencies needed:
- sbt
libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-projection-grpc" % "1.4.0", "com.typesafe.akka" %% "akka-cluster-typed" % "2.8.1", "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.8.1", "com.lightbend.akka" %% "akka-persistence-r2dbc" % "1.1.0", "com.lightbend.akka" %% "akka-projection-r2dbc" % "1.1.0" )
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-grpc_${scala.binary.version}</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_${scala.binary.version}</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-sharding-typed_${scala.binary.version}</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-persistence-r2dbc_${scala.binary.version}</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-r2dbc_${scala.binary.version}</artifactId> <version>1.1.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-grpc_${versions.ScalaBinary}:1.4.0" implementation "com.typesafe.akka:akka-cluster-typed_${versions.ScalaBinary}:2.8.1" implementation "com.typesafe.akka:akka-cluster-sharding-typed_${versions.ScalaBinary}:2.8.1" implementation "com.lightbend.akka:akka-persistence-r2dbc_${versions.ScalaBinary}:1.1.0" implementation "com.lightbend.akka:akka-projection-r2dbc_${versions.ScalaBinary}:1.1.0" }
Transitive dependencies
The table below shows akka-projection-grpc
’s direct dependencies, and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka.grpc akka-grpc-runtime_2.13 2.3.2 com.lightbend.akka akka-projection-core_2.13 1.4.0 com.lightbend.akka akka-projection-eventsourced_2.13 1.4.0 com.thesamet.scalapb scalapb-runtime_2.13 0.11.13 com.typesafe.akka akka-actor-typed_2.13 2.8.1 com.typesafe.akka akka-persistence-query_2.13 2.8.1 com.typesafe.akka akka-persistence-typed_2.13 2.8.1 com.typesafe.akka akka-stream_2.13 2.8.1 io.grpc grpc-stub 1.54.1 org.scala-lang scala-library 2.13.10 - Dependency tree
com.lightbend.akka.grpc akka-grpc-runtime_2.13 2.3.2 BUSL-1.1 com.google.protobuf protobuf-java 3.21.9 com.thesamet.scalapb scalapb-runtime_2.13 0.11.13 Apache 2 com.google.protobuf protobuf-java 3.21.9 com.thesamet.scalapb lenses_2.13 0.11.13 Apache 2 org.scala-lang.modules scala-collection-compat_2.13 2.9.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.13 2.9.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-discovery_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-http-core_2.13 10.5.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.5.0 BUSL-1.1 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-http_2.13 10.5.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.5.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.5.0 BUSL-1.1 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 io.grpc grpc-core 1.54.1 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.9.0 Apache-2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.54.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.54.1 Apache 2.0 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.21 io.grpc grpc-netty-shaded 1.54.1 Apache 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-core 1.54.1 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.9.0 Apache-2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.54.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.54.1 Apache 2.0 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.21 io.perfmark perfmark-api 0.25.0 Apache 2.0 io.grpc grpc-protobuf 1.54.1 Apache 2.0 com.google.api.grpc proto-google-common-protos 2.9.0 Apache-2.0 com.google.protobuf protobuf-java 3.21.9 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License com.google.protobuf protobuf-java 3.21.9 io.grpc grpc-api 1.54.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.54.1 Apache 2.0 io.grpc grpc-protobuf-lite 1.54.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.54.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.54.1 Apache 2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.lightbend.akka akka-projection-core_2.13 1.4.0 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.lightbend.akka akka-projection-eventsourced_2.13 1.4.0 com.lightbend.akka akka-projection-core_2.13 1.4.0 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.thesamet.scalapb scalapb-runtime_2.13 0.11.13 Apache 2 com.google.protobuf protobuf-java 3.21.9 com.thesamet.scalapb lenses_2.13 0.11.13 Apache 2 org.scala-lang.modules scala-collection-compat_2.13 2.9.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.13 2.9.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-remote_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.8.1 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.agrona agrona 1.17.1 The Apache License, Version 2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor-typed_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.8.1 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.1 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.1 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 io.grpc grpc-stub 1.54.1 Apache 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.54.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.18.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.54.1 Apache 2.0 org.scala-lang scala-library 2.13.10 Apache-2.0
API and setup
The same API as regular EventSourcedBehavior
s is used to define the logic. See Replicated Event Sourcing for more detail on designing an entity for replication.
To enable an entity for Replicated Event Sourcing over gRPC, use the Replication
Replication
grpcReplication
method, which takes ReplicationSettings
ReplicationSettings
, a factory function for the behavior, and an actor system.
The factory function will be passed a ReplicatedBehaviors
ReplicatedBehaviors
factory that must be used to set up the replicated event sourced behavior. Its setup
method provides a ReplicationContext
ReplicationContext
to create an EventSourcedBehavior
which will then be configured for replication. The behavior factory can be composed with other behavior factories, if access to the actor context or timers are needed.
- Scala
-
source
def init(implicit system: ActorSystem[_]): Replication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) Replication.grpcReplication(replicationSettings)(ShoppingCart.apply) } def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { Behaviors.setup[Command] { context => replicatedBehaviors.setup { replicationContext => new ShoppingCart(context, replicationContext).behavior() } } }
- Java
-
source
public static Replication<Command> init(ActorSystem<?> system) { ReplicationSettings<Command> replicationSettings = ReplicationSettings.create( Command.class, "replicated-shopping-cart", R2dbcReplication.create(system), system); return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system); } public static Behavior<Command> create( ReplicatedBehaviors<Command, Event, State> replicatedBehaviors) { return Behaviors.setup( context -> replicatedBehaviors.setup( replicationContext -> new ShoppingCart(context, replicationContext))); }
Settings
The ReplicationSettings
ReplicationSettings
apply
create
factory methods can accept an entity name, a ReplicationProjectionProvider
ReplicationProjectionProvider
and an actor system. The configuration of that system is expected to have a top level entry with the entity name containing this structure:
- Scala
-
source
my-replicated-entity { # which of the replicas this node belongs to, should be the same # across the nodes of each replica Akka cluster. self-replica-id = dca # Pick it up from an environment variable to re-use the same config # without changes across replicas self-replica-id = ${?SELF_REPLICA} # max number of parallel in-flight (sent over sharding) entity updates # per consumer/projection parallel-updates = 8 # Fail the replication stream (and restart with backoff) if completing # the write of a replicated event reaching the cluster takes more time # than this. entity-event-replication-timeout = 10s replicas: [ { # Unique identifier of the replica/datacenter, is stored in the events # and cannot be changed after events have been persisted. replica-id = "dca" # Number of replication streams/projections to start to consume events # from this replica number-of-consumers = 4 # Akka gRPC client config block for how to reach this replica # from the other replicas, note that binding the server/publishing # endpoint of each replica is done separately, in code. grpc.client { host = "dca.example.com" port = 8443 use-tls = true } }, { replica-id = "dcb" number-of-consumers = 4 # Optional - only run replication stream consumers for events from the # remote replica on nodes with this role consumers-on-cluster-role = dcb-consumer grpc.client { host = "dcb.example.com" port = 8444 } }, { replica-id = "dcc" number-of-consumers = 4 grpc.client { host = "dcc.example.com" port = 8445 } } ] }
- Java
-
source
my-replicated-entity { # which of the replicas this node belongs to, should be the same # across the nodes of each replica Akka cluster. self-replica-id = dca # Pick it up from an environment variable to re-use the same config # without changes across replicas self-replica-id = ${?SELF_REPLICA} # max number of parallel in-flight (sent over sharding) entity updates # per consumer/projection parallel-updates = 8 # Fail the replication stream (and restart with backoff) if completing # the write of a replicated event reaching the cluster takes more time # than this. entity-event-replication-timeout = 10s replicas: [ { # Unique identifier of the replica/datacenter, is stored in the events # and cannot be changed after events have been persisted. replica-id = "dca" # Number of replication streams/projections to start to consume events # from this replica number-of-consumers = 4 # Akka gRPC client config block for how to reach this replica # from the other replicas, note that binding the server/publishing # endpoint of each replica is done separately, in code. grpc.client { host = "dca.example.com" port = 8443 use-tls = true } }, { replica-id = "dcb" number-of-consumers = 4 # Optional - only run replication stream consumers for events from the # remote replica on nodes with this role consumers-on-cluster-role = dcb-consumer grpc.client { host = "dcb.example.com" port = 8444 } }, { replica-id = "dcc" number-of-consumers = 4 grpc.client { host = "dcc.example.com" port = 8445 } } ] }
The entries in the block refer to the local replica while replicas
is a list of all replicas, including the node itself, with details about how to reach the replicas across the network.
The grpc.client
section for each of the replicas is used for setting up the Akka gRPC client and supports the same discovery, TLS and other connection options as when using Akka gRPC directly. For more details see Akka gRPC configuration.
It is also possible to set up ReplicationSettings
ReplicationSettings
through APIs only and not rely on the configuration file at all.
Binding the publisher
Binding the publisher is a manual step to allow arbitrary customization of the Akka HTTP server and combining the endpoint with other HTTP and gRPC routes.
When there is only a single replicated entity and no other usage of Akka gRPC Projections in an application a convenience is provided through createSingleServiceHandler
on Replication
Replication
which will create a single handler:
- Scala
-
source
val replicatedShoppingCart = ShoppingCart.init(system) // alternatively // val replicatedShoppingCart = ShoppingCart.initWithProducerFilter(system) val replicationService = replicatedShoppingCart.createSingleServiceHandler()
- Java
-
source
Replication<ShoppingCart.Command> replicatedShoppingCart = ShoppingCart.init(system); // alternatively // Replication<ShoppingCart.Command> replicatedShoppingCart = ShoppingCart.initWithProducerFilter(system); Function<HttpRequest, CompletionStage<HttpResponse>> replicationService = replicatedShoppingCart.createSingleServiceHandler();
This can then be bound:
- Scala
-
source
val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( replicationService, proto.ShoppingCartServiceHandler.partial(grpcService), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.partial(List(proto.ShoppingCartService))) val bound = Http() .newServerAt(interface, port) .bind(service) .map(_.addToCoordinatedShutdown(3.seconds))
- Java
-
source
Function<HttpRequest, CompletionStage<HttpResponse>> service = ServiceHandler.concatOrNotFound( replicationService, ShoppingCartServiceHandlerFactory.create(grpcService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( Collections.singletonList(ShoppingCartService.description), system)); CompletionStage<ServerBinding> bound = Http.get(system).newServerAt(host, port).bind(service::apply);
When multiple producers exist, all instances of EventProducerSettings
EventProducerSettings
need to be passed at once to EventProducer.grpcServiceHandler
to create a single producer service handling each of the event streams.
- Scala
-
source
val replication: Replication[MyCommand] = Replication.grpcReplication(settings)(MyReplicatedBehavior.apply) val allSources: Set[EventProducerSource] = { Set( replication.eventProducerService, // producers from other replicated entities or gRPC projections otherReplication.eventProducerService) } val route = EventProducer.grpcServiceHandler(allSources) val handler = ServiceHandler.concatOrNotFound(route)
- Java
-
source
Set<EventProducerSource> allSources = new HashSet<>(); Replication<MyCommand> replication = ShoppingCart.init(system); allSources.add(replication.eventProducerService()); // add additional EventProducerSource from other entities or // Akka Projection gRPC allSources.add(otherReplication.eventProducerService()); Function<HttpRequest, CompletionStage<HttpResponse>> route = EventProducer.grpcServiceHandler(system, allSources); @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> handler = ServiceHandler.concatOrNotFound(route);
The Akka HTTP server must be running with HTTP/2, this is done through config:
Serialization of events
The events are serialized for being passed over the wire using the same Akka serializer as configured for serializing the events for storage.
Note that having separate replicas increases the risk that two different serialized formats and versions of the serializer are running at the same time, so extra care must be taken when changing the events and their serialization and deploying new versions of the application to the replicas.
For some scenarios it may be necessary to do a two-step deploy of format changes to not lose data, first deploy support for a new serialization format so that all replicas can deserialize it, then a second deploy where the new field is actually populated with data.
Filters
By default, events from all Replicated Event Sourced entities are replicated.
The same kind of filters as described in Akka Projection gRPC Filters can be used for Replicated Event Sourcing.
The producer defined filter:
- Scala
-
source
def initWithProducerFilter(implicit system: ActorSystem[_]): Replication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) val producerFilter: EventEnvelope[Event] => Boolean = { envelope => envelope.tags.contains(VipCustomerTag) } Replication.grpcReplication(replicationSettings, producerFilter)(ShoppingCart.apply) } def applyWithProducerFilter(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { Behaviors.setup[Command] { context => replicatedBehaviors.setup { replicationContext => new ShoppingCart(context, replicationContext, onlyReplicateVip = true).behavior() } } }
- Java
-
source
public static Replication<Command> initWithProducerFilter(ActorSystem<?> system) { ReplicationSettings<Command> replicationSettings = ReplicationSettings.create( Command.class, "replicated-shopping-cart", R2dbcReplication.create(system), system); Predicate<EventEnvelope<Event>> producerFilter = envelope -> { return envelope.getTags().contains(VIP_CUSTOMER_TAG); }; return Replication.grpcReplication(replicationSettings, producerFilter, ShoppingCart::create, system); } public static Behavior<Command> createWithProducerFilter( ReplicatedBehaviors<Command, Event, State> replicatedBehaviors) { return Behaviors.setup( context -> replicatedBehaviors.setup( replicationContext -> new ShoppingCart( context, replicationContext, true // onlyReplicateVip flag ))); }
Consumer defined filters are updated as described in Akka Projection gRPC Consumer defined filter
One thing to note is that streamId
is always the same as the entityType
when using Replicated Event Sourcing.
The entity id based filter criteria must include the replica id as suffix to the entity id, with |
separator.
Replicated Event Sourcing is bidirectional replication, and therefore you would typically have to define the same filters on both sides. That is not handled automatically.
Sample projects
Source code and build files for complete sample projects can be found in the akka/akka-projection
GitHub repository.
Access control
From the consumer
The consumer can pass metadata, such as auth headers, in each request to the producer service by specifying Metadata
Metadata
as additionalRequestMetadata
when creating each Replica
Replica
In the producer
Authentication and authorization for the producer can be done by implementing an EventProducerInterceptor
EventProducerInterceptor
and passing it to the grpcServiceHandler
method during producer bootstrap. The interceptor is invoked with the stream id and gRPC request metadata for each incoming request and can return a suitable error through GrpcServiceException
GrpcServiceException