Akka Projection gRPC
Akka Projection gRPC can be used for implementing asynchronous event based service-to-service communication. It provides an implementation of an Akka Projection that uses Akka gRPC as underlying transport between event producer and consumer.
This module is currently marked as May Change in the sense that the API might be changed based on feedback from initial usage. However, the module is ready for usage in production and we will not break serialization format of messages or stored data.
Overview
- An Entity stores events in its journal in service A.
- Consumer in service B starts an Akka Projection which locally reads its offset for service A’s replication stream.
- Service B establishes a replication stream from service A.
- Events are read from the journal.
- Event is emitted to the replication stream.
- Event is handled.
- Offset is stored.
- Producer continues to read new events from the journal and emit to the stream. As an optimization, events can also be published directly from the entity to the producer.
Dependencies
To use the R2DBC module of Akka Projections add the following dependency in your project:
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-grpc" % "1.3.1"
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-projection-grpc_${scala.binary.version}</artifactId> <version>1.3.1</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-projection-grpc_${versions.ScalaBinary}:1.3.1" }
Akka Projections require Akka 2.7.0 or later, see Akka version.
Project Info: Akka Projections gRPC | |
---|---|
Artifact | com.lightbend.akka
akka-projection-grpc
1.3.1
|
JDK versions | AdoptOpenJDK 8 AdoptOpenJDK 11 |
Scala versions | 2.13.10, 2.12.17 |
JPMS module name | akka.projection.grpc |
License | |
Readiness level | Supported, Lightbend Subscription provides support
Since 1.3.0, 2020-10-23
|
Home page | https://akka.io |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | GitHub issues |
Sources | https://github.com/akka/akka-projection |
Transitive dependencies
The table below shows akka-projection-grpc
’s direct dependencies, and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.lightbend.akka.grpc akka-grpc-runtime_2.13 2.2.1 com.lightbend.akka akka-projection-core_2.13 1.3.1 com.lightbend.akka akka-projection-eventsourced_2.13 1.3.1 com.thesamet.scalapb scalapb-runtime_2.13 0.11.11 com.typesafe.akka akka-actor-typed_2.13 2.7.0 com.typesafe.akka akka-persistence-query_2.13 2.7.0 com.typesafe.akka akka-persistence-typed_2.13 2.7.0 com.typesafe.akka akka-stream_2.13 2.7.0 io.grpc grpc-stub 1.48.1 org.scala-lang scala-library 2.13.10 - Dependency tree
com.lightbend.akka.grpc akka-grpc-runtime_2.13 2.2.1 BUSL-1.1 com.google.protobuf protobuf-java 3.21.9 com.thesamet.scalapb scalapb-runtime_2.13 0.11.11 Apache 2 com.google.protobuf protobuf-java 3.21.9 com.thesamet.scalapb lenses_2.13 0.11.11 Apache 2 org.scala-lang.modules scala-collection-compat_2.13 2.7.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.13 2.7.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-discovery_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-http-core_2.13 10.4.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.4.0 BUSL-1.1 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-http_2.13 10.4.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.4.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.4.0 BUSL-1.1 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 io.grpc grpc-core 1.48.1 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.9.0 Apache-2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.48.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.48.1 Apache 2.0 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.21 io.grpc grpc-netty-shaded 1.48.1 Apache 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-core 1.48.1 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.gson gson 2.9.0 Apache-2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.48.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.48.1 Apache 2.0 io.perfmark perfmark-api 0.25.0 Apache 2.0 org.codehaus.mojo animal-sniffer-annotations 1.21 io.perfmark perfmark-api 0.25.0 Apache 2.0 io.grpc grpc-protobuf 1.48.1 Apache 2.0 com.google.api.grpc proto-google-common-protos 2.9.0 Apache-2.0 com.google.protobuf protobuf-java 3.21.9 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License com.google.protobuf protobuf-java 3.21.9 io.grpc grpc-api 1.48.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.48.1 Apache 2.0 io.grpc grpc-protobuf-lite 1.48.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.48.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.48.1 Apache 2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.lightbend.akka akka-projection-core_2.13 1.3.1 com.typesafe.akka akka-actor-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.lightbend.akka akka-projection-eventsourced_2.13 1.3.1 com.lightbend.akka akka-projection-core_2.13 1.3.1 com.typesafe.akka akka-actor-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence-query_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.thesamet.scalapb scalapb-runtime_2.13 0.11.11 Apache 2 com.google.protobuf protobuf-java 3.21.9 com.thesamet.scalapb lenses_2.13 0.11.11 Apache 2 org.scala-lang.modules scala-collection-compat_2.13 2.7.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang.modules scala-collection-compat_2.13 2.7.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-actor-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-persistence-query_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-persistence_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-persistence_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-remote_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.7.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.agrona agrona 1.16.0 The Apache License, Version 2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor-typed_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-slf4j_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 org.scala-lang scala-library 2.13.10 Apache-2.0 org.slf4j slf4j-api 1.7.36 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.7.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.7.0 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.7.0 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.4.3 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-parser-combinators_2.13 1.1.2 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.scala-lang scala-library 2.13.10 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.10 Apache-2.0 io.grpc grpc-stub 1.48.1 Apache 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-api 1.48.1 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava guava 31.1-android com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.14.0 Apache 2.0 com.google.guava failureaccess 1.0.1 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava com.google.j2objc j2objc-annotations 1.3 The Apache Software License, Version 2.0 org.checkerframework checker-qual 3.12.0 The MIT License io.grpc grpc-context 1.48.1 Apache 2.0 org.scala-lang scala-library 2.13.10 Apache-2.0
Consumer
On the consumer side the Projection
is an ordinary SourceProvider for eventsBySlices that is using eventsBySlices
from the GrpcReadJournal
GrpcReadJournal
.
- Scala
-
source
import scala.concurrent.Future import akka.Done import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.persistence.Persistence import akka.persistence.query.typed.EventEnvelope import akka.projection.ProjectionBehavior import akka.projection.ProjectionId import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.Handler import org.slf4j.LoggerFactory import shoppingcart.CheckedOut import shoppingcart.ItemAdded import shoppingcart.ItemQuantityAdjusted import shoppingcart.ItemRemoved import shoppingcart.ShoppingCartEventsProto object ShoppingCartEventConsumer { def init(system: ActorSystem[_]): Unit = { implicit val sys: ActorSystem[_] = system val numberOfProjectionInstances = 4 val projectionName: String = "cart-events" val sliceRanges = Persistence(system).sliceRanges(numberOfProjectionInstances) val eventsBySlicesQuery = GrpcReadJournal(List(ShoppingCartEventsProto.javaDescriptor)) ShardedDaemonProcess(system).init( projectionName, numberOfProjectionInstances, { idx => val sliceRange = sliceRanges(idx) val projectionKey = s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}" val projectionId = ProjectionId.of(projectionName, projectionKey) val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef]( system, eventsBySlicesQuery, eventsBySlicesQuery.streamId, sliceRange.min, sliceRange.max) ProjectionBehavior( R2dbcProjection.atLeastOnceAsync( projectionId, None, sourceProvider, () => new EventHandler(projectionId))) }) } }
- Java
-
source
import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; import akka.japi.Pair; import akka.persistence.Persistence; import akka.persistence.query.typed.EventEnvelope; import akka.projection.ProjectionBehavior; import akka.projection.ProjectionId; import akka.projection.eventsourced.javadsl.EventSourcedProvider; import akka.projection.grpc.consumer.javadsl.GrpcReadJournal; import akka.projection.javadsl.SourceProvider; import akka.projection.r2dbc.javadsl.R2dbcProjection; import shopping.cart.proto.ShoppingCartEvents; class ShoppingCartEventConsumer { public static void init(ActorSystem<?> system) { int numberOfProjectionInstances = 4; String projectionName = "cart-events"; List<Pair<Integer, Integer>> sliceRanges = Persistence.get(system).getSliceRanges(numberOfProjectionInstances); GrpcReadJournal eventsBySlicesQuery = GrpcReadJournal.create( system, List.of(ShoppingCartEvents.getDescriptor())); ShardedDaemonProcess.get(system).init( ProjectionBehavior.Command.class, projectionName, numberOfProjectionInstances, idx -> { Pair<Integer, Integer> sliceRange = sliceRanges.get(idx); String projectionKey = eventsBySlicesQuery.streamId() + "-" + sliceRange.first() + "-" + sliceRange.second(); ProjectionId projectionId = ProjectionId.of(projectionName, projectionKey); SourceProvider<Offset, EventEnvelope<Object>> sourceProvider = EventSourcedProvider.eventsBySlices( system, eventsBySlicesQuery, eventsBySlicesQuery.streamId(), sliceRange.first(), sliceRange.second()); return ProjectionBehavior.create( R2dbcProjection.atLeastOnceAsync( projectionId, Optional.empty(), sourceProvider, () -> new EventHandler(projectionId), system)); }, ProjectionBehavior.stopMessage()); } }
The Protobuf descriptors are defined when the GrpcReadJournal
GrpcReadJournal
is created. The descriptors are used when deserializing the received events. The protobufDescriptors
is a list of the javaDescriptor
for the used protobuf messages. It is defined in the ScalaPB generated Proto
companion object. Note that GrpcReadJournal should be created with the GrpcReadJournal
GrpcReadJournal
apply
create
factory method and not from configuration via GrpcReadJournalProvider
when using Protobuf serialization.
The gRPC connection to the producer is defined in the consumer configuration.
The R2dbcProjection has support for storing the offset in a relational database using R2DBC.
The above example is using the ShardedDaemonProcess to distribute the instances of the Projection across the cluster. There are alternative ways of running the ProjectionBehavior
as described in Running a Projection
How to implement the EventHandler
and choose between different processing semantics is described in the R2dbcProjection documentation.
gRPC client lifecycle
When creating the GrpcReadJournal
GrpcReadJournal
a gRPC client is created for the target producer. The same GrpcReadJournal
instance and its gRPC client should be shared for the same target producer. The code examples above will share the instance between different Projection instances running in the same ActorSystem
. The gRPC clients will automatically be closed when the ActorSystem
is terminated.
If there is a need to close the gRPC client before ActorSystem
termination the close()
method of the GrpcReadJournal
GrpcReadJournal
can be called. After closing the GrpcReadJournal
instance cannot be used again.
Producer
Akka Projections gRPC provides the gRPC service implementation that is used by the consumer side. It is created with the EventProducer
EventProducer
:
- Scala
-
source
import akka.actor.typed.ActorSystem import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import akka.projection.grpc.producer.EventProducerSettings import akka.projection.grpc.producer.scaladsl.EventProducer import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation import scala.concurrent.Future def eventProducerService(system: ActorSystem[_]): PartialFunction[HttpRequest, Future[HttpResponse]] = { val transformation = Transformation.empty .registerMapper[ShoppingCart.ItemAdded, proto.ItemAdded](event => Some(transformItemAdded(event))) .registerMapper[ShoppingCart.ItemQuantityAdjusted, proto.ItemQuantityAdjusted](event => Some(transformItemQuantityAdjusted(event))) .registerMapper[ShoppingCart.ItemRemoved, proto.ItemRemoved](event => Some(transformItemRemoved(event))) .registerMapper[ShoppingCart.CheckedOut, proto.CheckedOut](event => Some(transformCheckedOut(event))) val eventProducerSource = EventProducer.EventProducerSource( "ShoppingCart", "cart", transformation, EventProducerSettings(system) ) EventProducer.grpcServiceHandler(eventProducerSource)(system) }
- Java
-
source
import akka.actor.typed.ActorSystem; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; import akka.projection.grpc.producer.EventProducerSettings; import akka.projection.grpc.producer.javadsl.EventProducer; import akka.projection.grpc.producer.javadsl.EventProducerSource; import akka.projection.grpc.producer.javadsl.Transformation; import java.util.Optional; import java.util.concurrent.CompletionStage; public static Function<HttpRequest, CompletionStage<HttpResponse>> eventProducerService(ActorSystem<?> system) { Transformation transformation = Transformation.empty() .registerMapper(ShoppingCart.ItemAdded.class, event -> Optional.of(transformItemAdded(event))) .registerMapper(ShoppingCart.ItemQuantityAdjusted.class, event -> Optional.of(transformItemQuantityAdjusted(event))) .registerMapper(ShoppingCart.ItemRemoved.class, event -> Optional.of(transformItemRemoved(event))) .registerMapper(ShoppingCart.CheckedOut.class, event -> Optional.of(transformCheckedOut(event))); EventProducerSource eventProducerSource = new EventProducerSource( "ShoppingCart", "cart", transformation, EventProducerSettings.apply(system) ); return EventProducer.grpcServiceHandler(system, eventProducerSource); }
Events can be transformed by application specific code on the producer side. The purpose is to be able to have a different public representation from the internal representation (stored in journal). The transformation functions are registered when creating the EventProducer
service. Here is an example of one of those transformation functions:
- Scala
-
source
private def transformItemAdded(added: ShoppingCart.ItemAdded): proto.ItemAdded = proto.ItemAdded( cartId = added.cartId, itemId = added.itemId, quantity = added.quantity)
- Java
-
source
private static shopping.cart.proto.ItemAdded transformItemAdded(ShoppingCart.ItemAdded itemAdded) { return shopping.cart.proto.ItemAdded.newBuilder() .setCartId(itemAdded.cartId) .setItemId(itemAdded.itemId) .setQuantity(itemAdded.quantity) .build(); }
To omit an event the transformation function can return None
Optional.empty()
.
That EventProducer
service is started in an Akka gRPC server like this:
- Scala
-
source
import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import akka.actor.typed.ActorSystem import akka.grpc.scaladsl.ServerReflection import akka.grpc.scaladsl.ServiceHandler import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse object ShoppingCartServer { def start( interface: String, port: Int, system: ActorSystem[_], grpcService: proto.ShoppingCartService, eventProducerService: PartialFunction[HttpRequest, Future[HttpResponse]]): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( eventProducerService, proto.ShoppingCartServiceHandler.partial(grpcService), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.partial(List(proto.ShoppingCartService)) ) val bound = Http() .newServerAt(interface, port) .bind(service) .map(_.addToCoordinatedShutdown(3.seconds)) bound.onComplete { case Success(binding) => val address = binding.localAddress system.log.info( "Shopping online at gRPC server {}:{}", address.getHostString, address.getPort) case Failure(ex) => system.log.error("Failed to bind gRPC endpoint, terminating system", ex) system.terminate() } } }
- Java
-
source
import akka.actor.typed.ActorSystem; import akka.grpc.javadsl.ServerReflection; import akka.grpc.javadsl.ServiceHandler; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.japi.function.Function; import java.net.InetSocketAddress; import java.time.Duration; import java.util.Collections; import java.util.concurrent.CompletionStage; import shopping.cart.proto.ShoppingCartService; import shopping.cart.proto.ShoppingCartServiceHandlerFactory; public final class ShoppingCartServer { private ShoppingCartServer() {} static void start(String host, int port, ActorSystem<?> system, ShoppingCartService grpcService, Function<HttpRequest, CompletionStage<HttpResponse>> eventProducerService) { @SuppressWarnings("unchecked") Function<HttpRequest, CompletionStage<HttpResponse>> service = ServiceHandler.concatOrNotFound( eventProducerService, ShoppingCartServiceHandlerFactory.create(grpcService, system), // ServerReflection enabled to support grpcurl without import-path and proto parameters ServerReflection.create( Collections.singletonList(ShoppingCartService.description), system)); CompletionStage<ServerBinding> bound = Http.get(system).newServerAt(host, port).bind(service::apply); bound.whenComplete( (binding, ex) -> { if (binding != null) { binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); InetSocketAddress address = binding.localAddress(); system .log() .info( "Shopping online at gRPC server {}:{}", address.getHostString(), address.getPort()); } else { system.log().error("Failed to bind gRPC endpoint, terminating system", ex); system.terminate(); } }); } }
This example includes an application specific ShoppingCartService
, which is unrelated to Akka Projections gRPC, but it illustrates how to combine the EventProducer
service with other gRPC services.
Sample projects
Source code and build files for complete sample projects can be found in akka/akka-projection
GitHub repository.
Scala:
Java:
Access control
From the consumer
The consumer can pass metadata, such as auth headers, in each request to the producer service by passing Metadata
Metadata
to the GrpcQuerySettings
GrpcQuerySettings
when constructing the read journal.
In the producer
Authentication and authorization for the producer can be done by implementing a EventProducerInterceptor
EventProducerInterceptor
and pass it to the grpcServiceHandler
method during producer bootstrap. The interceptor is invoked with the stream id and gRPC request metadata for each incoming request and can return a suitable error through GrpcServiceException
GrpcServiceException
Performance considerations
Lower latency
See Publish events for lower latency of eventsBySlices for low latency use cases.
Scalability limitations
Each connected consumer will start a eventsBySlices
query that will periodically poll and read events from the journal. That means that the journal database will become a bottleneck, unless it can be scaled out, when number of consumers increase. The producer service itself can easily be scaled out to more instances.
For the case of many consumers of the same event stream a future improvement to reduce the database load would be to share results of the queries across the different consumers, since most of them are probably reading at the tail of the same event stream.
Configuration
Consumer configuration
The configuration for the GrpcReadJournal
may look like this:
sourceakka.http.server.preview.enable-http2 = on
akka.projection.grpc.consumer {
client {
host = "127.0.0.1"
port = 8101
use-tls = false
}
stream-id = "cart"
}
The client
section in the configuration defines where the producer is running. It is an Akka gRPC configuration with several connection options.
Reference configuration
The following can be overridden in your application.conf
for the Projection specific settings:
sourceakka.projection.grpc {
consumer {
class = "akka.projection.grpc.consumer.GrpcReadJournalProvider"
# Note: these settings are only applied when constructing the consumer from config
# if creating the GrpcQuerySettings programmatically these settings are ignored
# Configuration of gRPC client.
# See https://doc.akka.io/libraries/akka-grpc/current/client/configuration.html#by-configuration
client = ${akka.grpc.client."*"}
client {
}
# Mandatory field identifying the stream to consume/type of entity, must be a stream id
# exposed by the producing/publishing side
stream-id = ""
# Pass these additional request headers as string values in each request to the producer
# can be used for example for authorization in combination with an interceptor in the producer.
# Example "x-auth-header": "secret"
additional-request-headers {}
}
producer {
# Query plugin for eventsBySlices, such as "akka.persistence.r2dbc.query".
query-plugin-id = ""
# When using async transformations it can be good to increase this.
transformation-parallelism = 1
}
}
Connecting to more than one producer
If you have several Projections that are connecting to different producer services they can be configured as separate GrpcReadJournal
GrpcReadJournal
configuration sections.
consumer1 = ${akka.projection.grpc.consumer}
consumer1 {
client {
host = "127.0.0.1"
port = 8101
}
}
consumer2 = ${akka.projection.grpc.consumer}
consumer2 {
client {
host = "127.0.0.1"
port = 8202
}
}
The GrpcReadJournal
plugin id is then consumer1
and consumer2
instead of the default akka.projection.grpc.consumer
.