Akka Projection gRPC

Akka Projection gRPC can be used for implementing asynchronous event based service-to-service communication. It provides an implementation of an Akka Projection that uses Akka gRPC as underlying transport between event producer and consumer.

Warning

This module is currently marked as May Change in the sense that the API might be changed based on feedback from initial usage. However, the module is ready for usage in production and we will not break serialization format of messages or stored data.

Overview

overview.png

  1. An Entity stores events in its journal in service A.
  2. Consumer in service B starts an Akka Projection which locally reads its offset for service A’s replication stream.
  3. Service B establishes a replication stream from service A.
  4. Events are read from the journal.
  5. Event is emitted to the replication stream.
  6. Event is handled.
  7. Offset is stored.
  8. Producer continues to read new events from the journal and emit to the stream. As an optimization, events can also be published directly from the entity to the producer.

Dependencies

To use the R2DBC module of Akka Projections add the following dependency in your project:

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-projection-grpc" % "1.3.1"
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-projection-grpc_${scala.binary.version}</artifactId>
    <version>1.3.1</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-projection-grpc_${versions.ScalaBinary}:1.3.1"
}

Akka Projections require Akka 2.7.0 or later, see Akka version.

Project Info: Akka Projections gRPC
Artifact
com.lightbend.akka
akka-projection-grpc
1.3.1
JDK versions
AdoptOpenJDK 8
AdoptOpenJDK 11
Scala versions2.13.10, 2.12.17
JPMS module nameakka.projection.grpc
License
Readiness level
Since 1.3.0, 2020-10-23
Home pagehttps://akka.io
API documentation
Forums
Release notesGitHub releases
IssuesGitHub issues
Sourceshttps://github.com/akka/akka-projection

Transitive dependencies

The table below shows akka-projection-grpc’s direct dependencies, and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.lightbend.akka.grpcakka-grpc-runtime_2.132.2.1
com.lightbend.akkaakka-projection-core_2.131.3.1
com.lightbend.akkaakka-projection-eventsourced_2.131.3.1
com.thesamet.scalapbscalapb-runtime_2.130.11.11
com.typesafe.akkaakka-actor-typed_2.132.7.0
com.typesafe.akkaakka-persistence-query_2.132.7.0
com.typesafe.akkaakka-persistence-typed_2.132.7.0
com.typesafe.akkaakka-stream_2.132.7.0
io.grpcgrpc-stub1.48.1
org.scala-langscala-library2.13.10
Dependency tree
com.lightbend.akka.grpc    akka-grpc-runtime_2.13    2.2.1    BUSL-1.1
    com.google.protobuf    protobuf-java    3.21.9
    com.thesamet.scalapb    scalapb-runtime_2.13    0.11.11    Apache 2
        com.google.protobuf    protobuf-java    3.21.9
        com.thesamet.scalapb    lenses_2.13    0.11.11    Apache 2
            org.scala-lang.modules    scala-collection-compat_2.13    2.7.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang.modules    scala-collection-compat_2.13    2.7.0    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-discovery_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-http-core_2.13    10.4.0    BUSL-1.1
        com.typesafe.akka    akka-parsing_2.13    10.4.0    BUSL-1.1
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-http_2.13    10.4.0    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.4.0    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.4.0    BUSL-1.1
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    io.grpc    grpc-core    1.48.1    Apache 2.0
        com.google.android    annotations    4.1.1.4    Apache 2.0
        com.google.code.gson    gson    2.9.0    Apache-2.0
        com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
        com.google.guava    guava    31.1-android
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    failureaccess    1.0.1
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
            com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
            org.checkerframework    checker-qual    3.12.0    The MIT License
        io.grpc    grpc-api    1.48.1    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    guava    31.1-android
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
                org.checkerframework    checker-qual    3.12.0    The MIT License
            io.grpc    grpc-context    1.48.1    Apache 2.0
        io.perfmark    perfmark-api    0.25.0    Apache 2.0
        org.codehaus.mojo    animal-sniffer-annotations    1.21
    io.grpc    grpc-netty-shaded    1.48.1    Apache 2.0
        com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
        com.google.guava    guava    31.1-android
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    failureaccess    1.0.1
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
            com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
            org.checkerframework    checker-qual    3.12.0    The MIT License
        io.grpc    grpc-core    1.48.1    Apache 2.0
            com.google.android    annotations    4.1.1.4    Apache 2.0
            com.google.code.gson    gson    2.9.0    Apache-2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    guava    31.1-android
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
                org.checkerframework    checker-qual    3.12.0    The MIT License
            io.grpc    grpc-api    1.48.1    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                com.google.guava    guava    31.1-android
                    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
                    org.checkerframework    checker-qual    3.12.0    The MIT License
                io.grpc    grpc-context    1.48.1    Apache 2.0
            io.perfmark    perfmark-api    0.25.0    Apache 2.0
            org.codehaus.mojo    animal-sniffer-annotations    1.21
        io.perfmark    perfmark-api    0.25.0    Apache 2.0
    io.grpc    grpc-protobuf    1.48.1    Apache 2.0
        com.google.api.grpc    proto-google-common-protos    2.9.0    Apache-2.0
            com.google.protobuf    protobuf-java    3.21.9
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.guava    guava    31.1-android
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    failureaccess    1.0.1
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
            com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
            org.checkerframework    checker-qual    3.12.0    The MIT License
        com.google.protobuf    protobuf-java    3.21.9
        io.grpc    grpc-api    1.48.1    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    guava    31.1-android
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
                org.checkerframework    checker-qual    3.12.0    The MIT License
            io.grpc    grpc-context    1.48.1    Apache 2.0
        io.grpc    grpc-protobuf-lite    1.48.1    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.guava    guava    31.1-android
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                com.google.guava    failureaccess    1.0.1
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
                org.checkerframework    checker-qual    3.12.0    The MIT License
            io.grpc    grpc-api    1.48.1    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                com.google.guava    guava    31.1-android
                    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.1
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
                    com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
                    org.checkerframework    checker-qual    3.12.0    The MIT License
                io.grpc    grpc-context    1.48.1    Apache 2.0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
com.lightbend.akka    akka-projection-core_2.13    1.3.1
    com.typesafe.akka    akka-actor-typed_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.slf4j    slf4j-api    1.7.36
        org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.slf4j    slf4j-api    1.7.36
    com.typesafe.akka    akka-persistence-query_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-persistence_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
                com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.4    MIT-0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
    com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
com.lightbend.akka    akka-projection-eventsourced_2.13    1.3.1
    com.lightbend.akka    akka-projection-core_2.13    1.3.1
        com.typesafe.akka    akka-actor-typed_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-slf4j_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.slf4j    slf4j-api    1.7.36
            org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.slf4j    slf4j-api    1.7.36
        com.typesafe.akka    akka-persistence-query_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-persistence_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
                    com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                        com.typesafe    config    1.4.2    Apache-2.0
                        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                            org.scala-lang    scala-library    2.13.10    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
                    com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                        com.typesafe    config    1.4.2    Apache-2.0
                        org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                            org.scala-lang    scala-library    2.13.10    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.reactivestreams    reactive-streams    1.0.4    MIT-0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
                com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.4    MIT-0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-persistence-query_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-persistence_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
                com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.4    MIT-0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
com.thesamet.scalapb    scalapb-runtime_2.13    0.11.11    Apache 2
    com.google.protobuf    protobuf-java    3.21.9
    com.thesamet.scalapb    lenses_2.13    0.11.11    Apache 2
        org.scala-lang.modules    scala-collection-compat_2.13    2.7.0    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.scala-lang.modules    scala-collection-compat_2.13    2.7.0    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
com.typesafe.akka    akka-actor-typed_2.13    2.7.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-slf4j_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.slf4j    slf4j-api    1.7.36
    org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.slf4j    slf4j-api    1.7.36
com.typesafe.akka    akka-persistence-query_2.13    2.7.0    BUSL-1.1
    com.typesafe.akka    akka-persistence_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
    com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
com.typesafe.akka    akka-persistence-typed_2.13    2.7.0    BUSL-1.1
    com.typesafe.akka    akka-actor-typed_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-slf4j_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.slf4j    slf4j-api    1.7.36
        org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.slf4j    slf4j-api    1.7.36
    com.typesafe.akka    akka-persistence-query_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-persistence_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
                com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.reactivestreams    reactive-streams    1.0.4    MIT-0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-persistence_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-remote_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
            com.typesafe    config    1.4.2    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.7.0    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.slf4j    slf4j-api    1.7.36
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.agrona    agrona    1.16.0    The Apache License, Version 2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-stream-typed_2.13    2.7.0    BUSL-1.1
        com.typesafe.akka    akka-actor-typed_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-slf4j_2.13    2.7.0    BUSL-1.1
                com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                    com.typesafe    config    1.4.2    Apache-2.0
                    org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                        org.scala-lang    scala-library    2.13.10    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.slf4j    slf4j-api    1.7.36
            org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.slf4j    slf4j-api    1.7.36
        com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
            com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
            com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
                com.typesafe    config    1.4.2    Apache-2.0
                org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
                    org.scala-lang    scala-library    2.13.10    Apache-2.0
                org.scala-lang    scala-library    2.13.10    Apache-2.0
            org.reactivestreams    reactive-streams    1.0.4    MIT-0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
com.typesafe.akka    akka-stream_2.13    2.7.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.7.0    BUSL-1.1
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.0    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.7.0    BUSL-1.1
    com.typesafe    ssl-config-core_2.13    0.4.3    Apache-2.0
        com.typesafe    config    1.4.2    Apache-2.0
        org.scala-lang.modules    scala-parser-combinators_2.13    1.1.2    Apache-2.0
            org.scala-lang    scala-library    2.13.10    Apache-2.0
        org.scala-lang    scala-library    2.13.10    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.10    Apache-2.0
io.grpc    grpc-stub    1.48.1    Apache 2.0
    com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
    com.google.guava    guava    31.1-android
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
        com.google.guava    failureaccess    1.0.1
        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
        com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
        org.checkerframework    checker-qual    3.12.0    The MIT License
    io.grpc    grpc-api    1.48.1    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
        com.google.guava    guava    31.1-android
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.14.0    Apache 2.0
            com.google.guava    failureaccess    1.0.1
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava
            com.google.j2objc    j2objc-annotations    1.3    The Apache Software License, Version 2.0
            org.checkerframework    checker-qual    3.12.0    The MIT License
        io.grpc    grpc-context    1.48.1    Apache 2.0
org.scala-lang    scala-library    2.13.10    Apache-2.0

Consumer

On the consumer side the Projection is an ordinary SourceProvider for eventsBySlices that is using eventsBySlices from the GrpcReadJournalGrpcReadJournal.

Scala
sourceimport scala.concurrent.Future

import akka.Done
import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
import akka.persistence.Persistence
import akka.persistence.query.typed.EventEnvelope
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
import shoppingcart.CheckedOut
import shoppingcart.ItemAdded
import shoppingcart.ItemQuantityAdjusted
import shoppingcart.ItemRemoved
import shoppingcart.ShoppingCartEventsProto

object ShoppingCartEventConsumer {
  def init(system: ActorSystem[_]): Unit = {
    implicit val sys: ActorSystem[_] = system
    val numberOfProjectionInstances = 4
    val projectionName: String = "cart-events"
    val sliceRanges =
      Persistence(system).sliceRanges(numberOfProjectionInstances)

    val eventsBySlicesQuery =
      GrpcReadJournal(List(ShoppingCartEventsProto.javaDescriptor))

    ShardedDaemonProcess(system).init(
      projectionName,
      numberOfProjectionInstances,
      { idx =>
        val sliceRange = sliceRanges(idx)
        val projectionKey =
          s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}"
        val projectionId = ProjectionId.of(projectionName, projectionKey)

        val sourceProvider = EventSourcedProvider.eventsBySlices[AnyRef](
          system,
          eventsBySlicesQuery,
          eventsBySlicesQuery.streamId,
          sliceRange.min,
          sliceRange.max)

        ProjectionBehavior(
          R2dbcProjection.atLeastOnceAsync(
            projectionId,
            None,
            sourceProvider,
            () => new EventHandler(projectionId)))
      })
  }

}
Java
sourceimport akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
import akka.japi.Pair;
import akka.persistence.Persistence;
import akka.persistence.query.typed.EventEnvelope;
import akka.projection.ProjectionBehavior;
import akka.projection.ProjectionId;
import akka.projection.eventsourced.javadsl.EventSourcedProvider;
import akka.projection.grpc.consumer.javadsl.GrpcReadJournal;
import akka.projection.javadsl.SourceProvider;
import akka.projection.r2dbc.javadsl.R2dbcProjection;
import shopping.cart.proto.ShoppingCartEvents;

class ShoppingCartEventConsumer {
  public static void init(ActorSystem<?> system) {
    int numberOfProjectionInstances = 4;
    String projectionName = "cart-events";
    List<Pair<Integer, Integer>> sliceRanges = Persistence.get(system).getSliceRanges(numberOfProjectionInstances);

    GrpcReadJournal eventsBySlicesQuery = GrpcReadJournal.create(
        system,
        List.of(ShoppingCartEvents.getDescriptor()));

    ShardedDaemonProcess.get(system).init(
        ProjectionBehavior.Command.class,
        projectionName,
        numberOfProjectionInstances,
        idx -> {
          Pair<Integer, Integer> sliceRange = sliceRanges.get(idx);
          String projectionKey = eventsBySlicesQuery.streamId() + "-" + sliceRange.first() + "-" + sliceRange.second();
          ProjectionId projectionId = ProjectionId.of(projectionName, projectionKey);

          SourceProvider<Offset, EventEnvelope<Object>> sourceProvider = EventSourcedProvider.eventsBySlices(
              system,
              eventsBySlicesQuery,
              eventsBySlicesQuery.streamId(),
              sliceRange.first(),
              sliceRange.second());

          return ProjectionBehavior.create(
              R2dbcProjection.atLeastOnceAsync(
                  projectionId,
                  Optional.empty(),
                  sourceProvider,
                  () -> new EventHandler(projectionId),
                  system));

        },
        ProjectionBehavior.stopMessage());
  }

}

The Protobuf descriptors are defined when the GrpcReadJournalGrpcReadJournal is created. The descriptors are used when deserializing the received events. The protobufDescriptors is a list of the javaDescriptor for the used protobuf messages. It is defined in the ScalaPB generated Proto companion object. Note that GrpcReadJournal should be created with the GrpcReadJournalGrpcReadJournal applycreate factory method and not from configuration via GrpcReadJournalProvider when using Protobuf serialization.

The gRPC connection to the producer is defined in the consumer configuration.

The R2dbcProjection has support for storing the offset in a relational database using R2DBC.

The above example is using the ShardedDaemonProcess to distribute the instances of the Projection across the cluster. There are alternative ways of running the ProjectionBehavior as described in Running a Projection

How to implement the EventHandler and choose between different processing semantics is described in the R2dbcProjection documentation.

gRPC client lifecycle

When creating the GrpcReadJournalGrpcReadJournal a gRPC client is created for the target producer. The same GrpcReadJournal instance and its gRPC client should be shared for the same target producer. The code examples above will share the instance between different Projection instances running in the same ActorSystem. The gRPC clients will automatically be closed when the ActorSystem is terminated.

If there is a need to close the gRPC client before ActorSystem termination the close() method of the GrpcReadJournalGrpcReadJournal can be called. After closing the GrpcReadJournal instance cannot be used again.

Producer

Akka Projections gRPC provides the gRPC service implementation that is used by the consumer side. It is created with the EventProducerEventProducer:

Scala
sourceimport akka.actor.typed.ActorSystem
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.projection.grpc.producer.EventProducerSettings
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation

import scala.concurrent.Future

def eventProducerService(system: ActorSystem[_]): PartialFunction[HttpRequest, Future[HttpResponse]] = {
  val transformation = Transformation.empty
    .registerMapper[ShoppingCart.ItemAdded, proto.ItemAdded](event => Some(transformItemAdded(event)))
    .registerMapper[ShoppingCart.ItemQuantityAdjusted, proto.ItemQuantityAdjusted](event => Some(transformItemQuantityAdjusted(event)))
    .registerMapper[ShoppingCart.ItemRemoved, proto.ItemRemoved](event => Some(transformItemRemoved(event)))
    .registerMapper[ShoppingCart.CheckedOut, proto.CheckedOut](event => Some(transformCheckedOut(event)))

  val eventProducerSource = EventProducer.EventProducerSource(
    "ShoppingCart",
    "cart",
    transformation,
    EventProducerSettings(system)
  )

  EventProducer.grpcServiceHandler(eventProducerSource)(system)
}
Java
sourceimport akka.actor.typed.ActorSystem;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.function.Function;
import akka.projection.grpc.producer.EventProducerSettings;
import akka.projection.grpc.producer.javadsl.EventProducer;
import akka.projection.grpc.producer.javadsl.EventProducerSource;
import akka.projection.grpc.producer.javadsl.Transformation;

import java.util.Optional;
import java.util.concurrent.CompletionStage;

public static Function<HttpRequest, CompletionStage<HttpResponse>> eventProducerService(ActorSystem<?> system) {
  Transformation transformation =
      Transformation.empty()
          .registerMapper(ShoppingCart.ItemAdded.class, event -> Optional.of(transformItemAdded(event)))
          .registerMapper(ShoppingCart.ItemQuantityAdjusted.class, event -> Optional.of(transformItemQuantityAdjusted(event)))
          .registerMapper(ShoppingCart.ItemRemoved.class, event -> Optional.of(transformItemRemoved(event)))
          .registerMapper(ShoppingCart.CheckedOut.class, event -> Optional.of(transformCheckedOut(event)));

  EventProducerSource eventProducerSource = new EventProducerSource(
      "ShoppingCart",
      "cart",
      transformation,
      EventProducerSettings.apply(system)
  );

  return EventProducer.grpcServiceHandler(system, eventProducerSource);
}

Events can be transformed by application specific code on the producer side. The purpose is to be able to have a different public representation from the internal representation (stored in journal). The transformation functions are registered when creating the EventProducer service. Here is an example of one of those transformation functions:

Scala
sourceprivate def transformItemAdded(added: ShoppingCart.ItemAdded): proto.ItemAdded =
  proto.ItemAdded(
    cartId = added.cartId,
    itemId = added.itemId,
    quantity = added.quantity)
Java
sourceprivate static shopping.cart.proto.ItemAdded transformItemAdded(ShoppingCart.ItemAdded itemAdded) {
  return shopping.cart.proto.ItemAdded.newBuilder()
      .setCartId(itemAdded.cartId)
      .setItemId(itemAdded.itemId)
      .setQuantity(itemAdded.quantity)
      .build();
}

To omit an event the transformation function can return NoneOptional.empty().

That EventProducer service is started in an Akka gRPC server like this:

Scala
sourceimport scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success

import akka.actor.typed.ActorSystem
import akka.grpc.scaladsl.ServerReflection
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse

object ShoppingCartServer {

  def start(
      interface: String,
      port: Int,
      system: ActorSystem[_],
      grpcService: proto.ShoppingCartService,
      eventProducerService: PartialFunction[HttpRequest, Future[HttpResponse]]): Unit = {
    implicit val sys: ActorSystem[_] = system
    implicit val ec: ExecutionContext =
      system.executionContext

    val service: HttpRequest => Future[HttpResponse] =
      ServiceHandler.concatOrNotFound(
        eventProducerService,
        proto.ShoppingCartServiceHandler.partial(grpcService),
        // ServerReflection enabled to support grpcurl without import-path and proto parameters
        ServerReflection.partial(List(proto.ShoppingCartService))
      )

    val bound =
      Http()
        .newServerAt(interface, port)
        .bind(service)
        .map(_.addToCoordinatedShutdown(3.seconds))

    bound.onComplete {
      case Success(binding) =>
        val address = binding.localAddress
        system.log.info(
          "Shopping online at gRPC server {}:{}",
          address.getHostString,
          address.getPort)
      case Failure(ex) =>
        system.log.error("Failed to bind gRPC endpoint, terminating system", ex)
        system.terminate()
    }
  }

}
Java
sourceimport akka.actor.typed.ActorSystem;
import akka.grpc.javadsl.ServerReflection;
import akka.grpc.javadsl.ServiceHandler;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.japi.function.Function;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletionStage;
import shopping.cart.proto.ShoppingCartService;
import shopping.cart.proto.ShoppingCartServiceHandlerFactory;

public final class ShoppingCartServer {

  private ShoppingCartServer() {}

  static void start(String host, int port, ActorSystem<?> system, ShoppingCartService grpcService, Function<HttpRequest, CompletionStage<HttpResponse>> eventProducerService) {
    @SuppressWarnings("unchecked")
    Function<HttpRequest, CompletionStage<HttpResponse>> service =
        ServiceHandler.concatOrNotFound(
            eventProducerService,
            ShoppingCartServiceHandlerFactory.create(grpcService, system),
            // ServerReflection enabled to support grpcurl without import-path and proto parameters
            ServerReflection.create(
                Collections.singletonList(ShoppingCartService.description), system));

    CompletionStage<ServerBinding> bound =
        Http.get(system).newServerAt(host, port).bind(service::apply);

    bound.whenComplete(
        (binding, ex) -> {
          if (binding != null) {
            binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system);
            InetSocketAddress address = binding.localAddress();
            system
                .log()
                .info(
                    "Shopping online at gRPC server {}:{}",
                    address.getHostString(),
                    address.getPort());
          } else {
            system.log().error("Failed to bind gRPC endpoint, terminating system", ex);
            system.terminate();
          }
        });
  }
}

This example includes an application specific ShoppingCartService, which is unrelated to Akka Projections gRPC, but it illustrates how to combine the EventProducer service with other gRPC services.

Sample projects

Source code and build files for complete sample projects can be found in akka/akka-projection GitHub repository.

Scala:

Java:

Access control

From the consumer

The consumer can pass metadata, such as auth headers, in each request to the producer service by passing MetadataMetadata to the GrpcQuerySettingsGrpcQuerySettings when constructing the read journal.

In the producer

Authentication and authorization for the producer can be done by implementing a EventProducerInterceptorEventProducerInterceptor and pass it to the grpcServiceHandler method during producer bootstrap. The interceptor is invoked with the stream id and gRPC request metadata for each incoming request and can return a suitable error through GrpcServiceExceptionGrpcServiceException

Performance considerations

Lower latency

See Publish events for lower latency of eventsBySlices for low latency use cases.

Scalability limitations

Each connected consumer will start a eventsBySlices query that will periodically poll and read events from the journal. That means that the journal database will become a bottleneck, unless it can be scaled out, when number of consumers increase. The producer service itself can easily be scaled out to more instances.

For the case of many consumers of the same event stream a future improvement to reduce the database load would be to share results of the queries across the different consumers, since most of them are probably reading at the tail of the same event stream.

Configuration

Consumer configuration

The configuration for the GrpcReadJournal may look like this:

sourceakka.http.server.preview.enable-http2 = on

akka.projection.grpc.consumer {
  client {
    host = "127.0.0.1"
    port = 8101
    use-tls = false
  }
  stream-id = "cart"
}

The client section in the configuration defines where the producer is running. It is an Akka gRPC configuration with several connection options.

Reference configuration

The following can be overridden in your application.conf for the Projection specific settings:

sourceakka.projection.grpc {
  consumer {
    class = "akka.projection.grpc.consumer.GrpcReadJournalProvider"

    # Note: these settings are only applied when constructing the consumer from config
    #       if creating the GrpcQuerySettings programmatically these settings are ignored

    # Configuration of gRPC client.
    # See https://doc.akka.io/docs/akka-grpc/current/client/configuration.html#by-configuration
    client = ${akka.grpc.client."*"}
    client {
    }

    # Mandatory field identifying the stream to consume/type of entity, must be a stream id
    # exposed by the producing/publishing side
    stream-id = ""

    # Pass these additional request headers as string values in each request to the producer
    # can be used for example for authorization in combination with an interceptor in the producer.
    # Example "x-auth-header": "secret"
    additional-request-headers {}
  }

  producer {
    # Query plugin for eventsBySlices, such as "akka.persistence.r2dbc.query".
    query-plugin-id = ""

    # When using async transformations it can be good to increase this.
    transformation-parallelism = 1

  }


}

Connecting to more than one producer

If you have several Projections that are connecting to different producer services they can be configured as separate GrpcReadJournalGrpcReadJournal configuration sections.

consumer1 = ${akka.projection.grpc.consumer}
consumer1 {
  client {
    host = "127.0.0.1"
    port = 8101
  }
}

consumer2 = ${akka.projection.grpc.consumer}
consumer2 {
  client {
    host = "127.0.0.1"
    port = 8202
  }
}

The GrpcReadJournal plugin id is then consumer1 and consumer2 instead of the default akka.projection.grpc.consumer.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.