Akka Replicated Event Sourcing over gRPC
Akka Replicated Event Sourcing extends Akka Persistence allowing multiple replicas of the same entity, all accepting writes, for example in different data centers or cloud provider regions. This makes it possible to implement patterns such as active-active and hot standby.
Originally, Akka Replicated Event Sourcing has required cross-replica access to the underlying replica database, which can be hard to open up for security and infrastructure reasons. It was also easiest to use in an Akka Multi DC Cluster setup where a single cluster spans multiple datacenters or regions, another thing that can be complicated to allow.
Akka Replicated Event Sourcing over gRPC builds on Akka Projection gRPC and Akka gRPC to instead use gRPC as the cross-replica transport for events.
There are no requirements that the replicas are sharing a cluster, instead it is expected that each replica is a separate Akka cluster with the gRPC replication transport as only connection in between.
Overview
For a basic overview of Replicated Event Sourcing see the Akka Replicated Event Sourcing docs
Akka Replicated Event Sourcing over gRPC consists of the following three parts:
-
The Replicated Event Sourced Behavior is run in each replica as a sharded entity using Akka Cluster Sharding.
-
The events of each replica are published to the other replicas using Akka Projection gRPC endpoints.
-
Each replica consumes a number of parallel slices of the events from each other replica by running Akka Projection gRPC in Akka Sharded Daemon Process.
Dependencies
The functionality is provided through the akka-projection-grpc
module.
Project Info: Akka Projections gRPC | |
---|---|
Artifact | com.lightbend.akka
akka-projection-grpc
1.6.2
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 Eclipse Temurin JDK 21 |
Scala versions | 2.13.15, 3.3.4 |
JPMS module name | akka.projection.grpc |
License | |
Readiness level |
Since 1.3.0, 2020-10-23
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
To use the gRPC module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-grpc" % "1.6.2"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-grpc_${scala.binary.version}</artifactId> <version>1.6.2</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-grpc_${versions.ScalaBinary}:1.6.2" }
Akka Replicated Event Sourcing over gRPC requires Akka 2.8.0 or later and can only be run in an Akka cluster since it uses cluster components.
It is currently only possible to use akka-projection-r2dbc as the projection storage and journal for this module.
The full set of dependencies needed:
- sbt
libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-projection-grpc" % "1.6.2", "com.typesafe.akka" %% "akka-cluster-typed" % "2.10.0", "com.typesafe.akka" %% "akka-cluster-sharding-typed" % "2.10.0", "com.lightbend.akka" %% "akka-persistence-r2dbc" % "1.3.0", "com.lightbend.akka" %% "akka-projection-r2dbc" % "1.6.2" )
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-grpc_${scala.binary.version}</artifactId> <version>1.6.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-typed_${scala.binary.version}</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-cluster-sharding-typed_${scala.binary.version}</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-persistence-r2dbc_${scala.binary.version}</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-r2dbc_${scala.binary.version}</artifactId> <version>1.6.2</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-grpc_${versions.ScalaBinary}:1.6.2" implementation "com.typesafe.akka:akka-cluster-typed_${versions.ScalaBinary}:2.10.0" implementation "com.typesafe.akka:akka-cluster-sharding-typed_${versions.ScalaBinary}:2.10.0" implementation "com.lightbend.akka:akka-persistence-r2dbc_${versions.ScalaBinary}:1.3.0" implementation "com.lightbend.akka:akka-projection-r2dbc_${versions.ScalaBinary}:1.6.2" }
Transitive dependencies
The table below shows akka-projection-grpc
’s direct dependencies, and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka.grpc akka-grpc-runtime_2.13 2.5.0 com.lightbend.akka akka-projection-core_2.13 1.6.2 com.lightbend.akka akka-projection-eventsourced_2.13 1.6.2 com.thesamet.scalapb scalapb-runtime_2.13 0.11.17 com.typesafe.akka akka-actor-typed_2.13 2.10.0 com.typesafe.akka akka-persistence-query_2.13 2.10.0 com.typesafe.akka akka-persistence-typed_2.13 2.10.0 com.typesafe.akka akka-stream_2.13 2.10.0 io.grpc grpc-stub 1.63.2 org.scala-lang scala-library 2.13.15 - Dependency tree
com.lightbend.akka.grpc akka-grpc-runtime_2.13 2.5.0 BUSL-1.1 com.google.protobuf protobuf-java 3.25.5 com.thesamet.scalapb scalapb-runtime_2.13 0.11.17 Apache 2 com.google.protobuf protobuf-java 3.25.5 com.thesamet.scalapb lenses_2.13 0.11.17 Apache 2 org.scala-lang.modules scala-collection-compat_2.13 2.12.0 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.13 2.12.0 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-discovery_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 io.grpc grpc-core 1.63.2 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.10.1 Apache-2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-context 1.63.2 Apache 2.0 io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.perfmark perfmark-api 0.26.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.23 io.grpc grpc-netty-shaded 1.63.2 Apache 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-core 1.63.2 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.10.1 Apache-2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-context 1.63.2 Apache 2.0 io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.perfmark perfmark-api 0.26.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.23 io.grpc grpc-util 1.63.2 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-core 1.63.2 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.10.1 Apache-2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-context 1.63.2 Apache 2.0 io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.perfmark perfmark-api 0.26.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.23 org.codehaus.mojo animal-sniffer-annotations 1.23 io.perfmark perfmark-api 0.26.0 Apache 2.0 io.grpc grpc-protobuf 1.63.2 Apache 2.0 com.google.api.grpc proto-google-common-protos 2.29.0 Apache-2.0 com.google.protobuf protobuf-java 3.25.5 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License com.google.protobuf protobuf-java 3.25.5 io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-protobuf-lite 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License org.scala-lang scala-library 2.13.15 Apache-2.0 com.lightbend.akka akka-projection-core_2.13 1.6.2 com.typesafe.akka akka-actor-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-persistence-query_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.lightbend.akka akka-projection-eventsourced_2.13 1.6.2 com.lightbend.akka akka-projection-core_2.13 1.6.2 com.typesafe.akka akka-actor-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-persistence-query_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.thesamet.scalapb scalapb-runtime_2.13 0.11.17 Apache 2 com.google.protobuf protobuf-java 3.25.5 com.thesamet.scalapb lenses_2.13 0.11.17 Apache 2 org.scala-lang.modules scala-collection-compat_2.13 2.12.0 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.13 2.12.0 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-actor-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-persistence-query_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-persistence-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-persistence-query_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-persistence_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-remote_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.agrona agrona 1.22.0 The Apache License, Version 2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor-typed_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.15 Apache-2.0 org.slf4j slf4j-api 2.0.16 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.15 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.15 Apache-2.0 io.grpc grpc-stub 1.63.2 Apache 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License io.grpc grpc-api 1.63.2 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava guava 32.1.3-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.23.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 2.8 Apache License, Version 2.0 org.checkerframework checker-qual 3.37.0 The MIT License org.scala-lang scala-library 2.13.15 Apache-2.0
API and setup
The same API as regular EventSourcedBehavior
s is used to define the logic. See Replicated Event Sourcing for more detail on designing an entity for replication.
To enable an entity for Replicated Event Sourcing over gRPC, use the Replication
Replication
grpcReplication
method, which takes ReplicationSettings
ReplicationSettings
, a factory function for the behavior, and an actor system.
The factory function will be passed a ReplicatedBehaviors
ReplicatedBehaviors
factory that must be used to set up the replicated event sourced behavior. Its setup
method provides a ReplicationContext
ReplicationContext
to create an EventSourcedBehavior
which will then be configured for replication. The behavior factory can be composed with other behavior factories, if access to the actor context or timers are needed.
- Scala
-
source
def init(implicit system: ActorSystem[_]): Replication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) Replication.grpcReplication(replicationSettings)(ShoppingCart.apply) } def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { Behaviors.setup[Command] { context => replicatedBehaviors.setup { replicationContext => new ShoppingCart(context, replicationContext).behavior() } } }
- Java
-
source
public static Replication<Command> init(ActorSystem<?> system) { ReplicationSettings<Command> replicationSettings = ReplicationSettings.create( Command.class, ShoppingCart.ENTITY_TYPE, R2dbcReplication.create(system), system); return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system); } public static Behavior<Command> create( ReplicatedBehaviors<Command, Event, State> replicatedBehaviors) { return Behaviors.setup( context -> replicatedBehaviors.setup( replicationContext -> new ShoppingCart(context, replicationContext))); }
Settings
The ReplicationSettings
ReplicationSettings
apply
create
factory methods can accept an entity name, a ReplicationProjectionProvider
ReplicationProjectionProvider
and an actor system. The configuration of that system is expected to have a top level entry with the entity name containing this structure:
- Scala
-
source
# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart` # is the same as the ShoppingCart entity type name. replicated-shopping-cart { # which of the replicas this node belongs to, should be the same # across the nodes of each replica Akka cluster. self-replica-id = us-east-1 # Pick it up from an environment variable to re-use the same config # without changes across replicas self-replica-id = ${?SELF_REPLICA} # max number of parallel in-flight (sent over sharding) entity updates # per consumer/projection parallel-updates = 8 # Fail the replication stream (and restart with backoff) if completing # the write of a replicated event reaching the cluster takes more time # than this. entity-event-replication-timeout = 10s replicas: [ { # Unique identifier of the replica/datacenter, is stored in the events # and cannot be changed after events have been persisted. replica-id = "us-east-1" # Number of replication streams/projections to start to consume events # from this replica number-of-consumers = 4 # Akka gRPC client config block for how to reach this replica # from the other replicas, note that binding the server/publishing # endpoint of each replica is done separately, in code. grpc.client { host = "k8s-shopping-604179632a-148180922.us-east-2.elb.amazonaws.com" host = ${?US_EAST_1_GRPC_HOST} port = 443 port = ${?US_EAST_1_GRPC_PORT} use-tls = true } }, { replica-id = "eu-west-1" number-of-consumers = 4 # Optional - only run replication stream consumers for events from the # remote replica on nodes with this role consumers-on-cluster-role = replication-consumer grpc.client { host = "k8s-shopping-19708e1324-24617530ddc6d2cb.elb.eu-west-1.amazonaws.com" host = ${?EU_WEST_1_GRPC_HOST} port = 443 port = ${?EU_WEST_1_GRPC_PORT} } } ] }
- Java
-
source
# Replication configuration for the ShoppingCart. Note that config `replicated-shopping-cart` # is the same as the ShoppingCart entity type name. replicated-shopping-cart { # which of the replicas this node belongs to, should be the same # across the nodes of each replica Akka cluster. self-replica-id = us-east-1 # Pick it up from an environment variable to re-use the same config # without changes across replicas self-replica-id = ${?SELF_REPLICA} # max number of parallel in-flight (sent over sharding) entity updates # per consumer/projection parallel-updates = 8 # Fail the replication stream (and restart with backoff) if completing # the write of a replicated event reaching the cluster takes more time # than this. entity-event-replication-timeout = 10s replicas: [ { # Unique identifier of the replica/datacenter, is stored in the events # and cannot be changed after events have been persisted. replica-id = "us-east-1" # Number of replication streams/projections to start to consume events # from this replica number-of-consumers = 4 # Akka gRPC client config block for how to reach this replica # from the other replicas, note that binding the server/publishing # endpoint of each replica is done separately, in code. grpc.client { host = "k8s-shopping-604179632a-148180922.us-east-2.elb.amazonaws.com" host = ${?US_EAST_1_GRPC_HOST} port = 443 port = ${?US_EAST_1_GRPC_PORT} use-tls = true } }, { replica-id = "eu-west-1" number-of-consumers = 4 # Optional - only run replication stream consumers for events from the # remote replica on nodes with this role consumers-on-cluster-role = replication-consumer grpc.client { host = "k8s-shopping-19708e1324-24617530ddc6d2cb.elb.eu-west-1.amazonaws.com" host = ${?EU_WEST_1_GRPC_HOST} port = 443 port = ${?EU_WEST_1_GRPC_PORT} } } ] }
The entries in the block refer to the local replica while replicas
is a list of all replicas, including the node itself, with details about how to reach the replicas across the network.
The grpc.client
section for each of the replicas is used for setting up the Akka gRPC client and supports the same discovery, TLS and other connection options as when using Akka gRPC directly. For more details see Akka gRPC configuration.
It is also possible to set up ReplicationSettings
ReplicationSettings
through APIs only and not rely on the configuration file at all.
Fully connected topology
In a network topology where each replica cluster can connect to each other replica cluster the configuration should list all replicas and gRPC server must be started in each replica.
Binding the publisher
Binding the publisher is a manual step to allow arbitrary customization of the Akka HTTP server and combining the endpoint with other HTTP and gRPC routes.
When there is only a single replicated entity and no other usage of Akka gRPC Projections in an application a convenience is provided through createSingleServiceHandler
on Replication
Replication
which will create a single handler:
- Scala
-
source
val replicatedShoppingCart = ShoppingCart.init(system) // alternatively // val replicatedShoppingCart = ShoppingCart.initWithProducerFilter(system) val replicationService = replicatedShoppingCart.createSingleServiceHandler()
- Java
-
source
Replication<ShoppingCart.Command> replicatedShoppingCart = ShoppingCart.init(system); // alternatively // Replication<ShoppingCart.Command> replicatedShoppingCart = ShoppingCart.initWithProducerFilter(system); Function<HttpRequest, CompletionStage<HttpResponse>> replicationService = replicatedShoppingCart.createSingleServiceHandler();
This can then be bound:
- Scala
-
source
val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( replicationService, proto.ShoppingCartServiceHandler.partial(grpcService), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.partial(List(proto.ShoppingCartService))) val bound = Http() .newServerAt(interface, port) .bind(service) .map(_.addToCoordinatedShutdown(3.seconds))
- Java
-
source
Function<HttpRequest, CompletionStage<HttpResponse>> service = ServiceHandler.concatOrNotFound( replicationService, ShoppingCartServiceHandlerFactory.create(grpcService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( Collections.singletonList(ShoppingCartService.description), system)); CompletionStage<ServerBinding> bound = Http.get(system).newServerAt(host, port).bind(service::apply);
When multiple producers exist, all instances of EventProducerSettings
EventProducerSettings
need to be passed at once to EventProducer.grpcServiceHandler
to create a single producer service handling each of the event streams.
- Scala
-
source
val replication: Replication[MyCommand] = Replication.grpcReplication(settings)(MyReplicatedBehavior.apply) val allSources: Set[EventProducerSource] = { Set( replication.eventProducerSource, // producers from other replicated entities or gRPC projections otherReplication.eventProducerSource) } val route = EventProducer.grpcServiceHandler(allSources) val handler = ServiceHandler.concatOrNotFound(route)
- Java
-
source
Set<EventProducerSource> allSources = new HashSet<>(); Replication<MyCommand> replication = ShoppingCart.init(system); allSources.add(replication.eventProducerService()); // add additional EventProducerSource from other entities or // Akka Projection gRPC allSources.add(otherReplication.eventProducerService()); Function<HttpRequest, CompletionStage<HttpResponse>> route = EventProducer.grpcServiceHandler(system, allSources); @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> handler = ServiceHandler.concatOrNotFound(route);
The Akka HTTP server must be running with HTTP/2, this is done through config:
Edge topology
In some use cases it is not possible to use a fully connected topology, for example because of firewalls or NAT in front of each producer. The consumer may also not know about all producers up front.
This is typical when using Replicated Event Sourcing at the edge. where the connection can only be established from the edge service to the cloud service.
For this purpose, Akka Replicated Event Sourcing gRPC has a mode where the replication streams for both consuming and producing events are initiated by one side. In this way a star topology can be defined, and it’s possible to combine with replicas that are fully connected.
You would still define how to connect to other replicas as described above, but it’s only needed on the edge side, and it would typically only define one or a few cloud replicas that it will connect to. A gRPC server is not needed on the edge side, because there are no incoming connections.
On the edge side you start with Replication.grpcEdgeReplication
.
- Scala
-
source
def initEdge(implicit system: ActorSystem[_]): EdgeReplication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) Replication.grpcEdgeReplication(replicationSettings)(ShoppingCart.apply) }
- Java
-
source
public static EdgeReplication<Command> initEdge(ActorSystem<?> system) { ReplicationSettings<Command> replicationSettings = ReplicationSettings.create( Command.class, ShoppingCart.ENTITY_TYPE, R2dbcReplication.create(system), system); return Replication.grpcEdgeReplication(replicationSettings, ShoppingCart::create, system); }
On the cloud side you would start with Replication.grpcReplication
as described above, but with the addition withEdgeReplication(true)
in the ReplicationSettings
ReplicationSettings
or enable akka.projection.grpc.replication.accept-edge-replication
configuration.
- Scala
-
source
def initAllowEdge(implicit system: ActorSystem[_]): EdgeReplication[Command] = { val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) .withEdgeReplication(true) Replication.grpcEdgeReplication(replicationSettings)(ShoppingCart.apply) }
- Java
-
source
public static Replication<Command> initAllowEdge(ActorSystem<?> system) { ReplicationSettings<Command> replicationSettings = ReplicationSettings.create( Command.class, ShoppingCart.ENTITY_TYPE, R2dbcReplication.create(system), system) .withEdgeReplication(true); return Replication.grpcReplication(replicationSettings, ShoppingCart::create, system); }
Serialization of events
The events are serialized for being passed over the wire using the same Akka serializer as configured for serializing the events for storage.
Note that having separate replicas increases the risk that two different serialized formats and versions of the serializer are running at the same time, so extra care must be taken when changing the events and their serialization and deploying new versions of the application to the replicas.
For some scenarios it may be necessary to do a two-step deploy of format changes to not lose data, first deploy support for a new serialization format so that all replicas can deserialize it, then a second deploy where the new field is actually populated with data.
Filters
By default, events from all Replicated Event Sourced entities are replicated.
The same kind of filters as described in Akka Projection gRPC Filters can be used for Replicated Event Sourcing.
The producer filter is defined with withProducerFilter
or withProducerFilterTopicExpression
in ReplicationSettings
ReplicationSettings
:
- Scala
-
source
def initWithProducerFilter(implicit system: ActorSystem[_]): Replication[Command] = { val producerFilter: EventEnvelope[Event] => Boolean = { envelope => envelope.tags.contains(VipCustomerTag) } val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) .withProducerFilter(producerFilter) Replication.grpcReplication(replicationSettings)(ShoppingCart.applyWithProducerFilter) } def applyWithProducerFilter(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { Behaviors.setup[Command] { context => replicatedBehaviors.setup { replicationContext => new ShoppingCart(context, replicationContext, onlyReplicateVip = true).behavior() } } }
- Java
-
source
public static Replication<Command> initWithProducerFilter(ActorSystem<?> system) { Predicate<EventEnvelope<Event>> producerFilter = envelope -> envelope.getTags().contains(VIP_CUSTOMER_TAG); ReplicationSettings<Command> replicationSettings = ReplicationSettings.create( Command.class, ShoppingCart.ENTITY_TYPE, R2dbcReplication.create(system), system) .withProducerFilter(producerFilter); return Replication.grpcReplication(replicationSettings, ShoppingCart::createWithProducerFilter, system); } public static Behavior<Command> createWithProducerFilter( ReplicatedBehaviors<Command, Event, State> replicatedBehaviors) { return Behaviors.setup( context -> replicatedBehaviors.setup( replicationContext -> new ShoppingCart( context, replicationContext, true // onlyReplicateVip flag ))); }
The initial consumer filter is defined with withInitialConsumerFilter
in ReplicationSettings
ReplicationSettings
. Consumer defined filters can be updated in runtime as described in Akka Projection gRPC Consumer defined filter
One thing to note is that streamId
is always the same as the entityType
when using Replicated Event Sourcing.
The entity id based filter criteria must include the replica id as suffix to the entity id, with |
separator.
Sample projects
Source code and build files for complete sample projects can be found in the Akka Distributed Cluster Guide and Akka Edge Guide.
Security
Mutual authentication with TLS can be setup according to the Akka gRPC documentation
Access control
From the consumer
The consumer can pass metadata, such as auth headers, in each request to the producer service by specifying Metadata
Metadata
as additionalRequestMetadata
when creating each Replica
Replica
In the producer
Authentication and authorization for the producer can be done by implementing an EventProducerInterceptor
EventProducerInterceptor
and passing it to the grpcServiceHandler
method during producer bootstrap. The interceptor is invoked with the stream id and gRPC request metadata for each incoming request and can return a suitable error through GrpcServiceException
GrpcServiceException
Migrating from non-replicated
It is possible to migrate from an ordinary single-writer EventSourcedBehavior
to a ReplicatedEventSourcedBehavior
. The events are stored in the same way, aside from some metadata that is filled in automatically if it’s missing.
The ReplicaId
for the where the original entity was located should be empty. This makes sure that the same PersistenceId
and same events are used for the original replica.
The aspects of Resolving conflicting updates must be considered in the logic of the event handler when migrating to Replicated Event Sourcing.