New to Akka? Start with the Akka SDK.

Google Cloud Pub/Sub gRPC

Note

Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via the gRPC protocol. The integration between Akka Stream and gRPC is handled by Akka gRPC 2.5. For a connector that uses HTTP for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub connector.

Project Info: Alpakka Google Cloud PubSub (gRPC)
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub-grpc
10.0.1
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.17, 3.3.7
JPMS module nameakka.stream.alpakka.google.cloud.pubsub.grpc
License
Readiness level
Since 1.0-M1, 2018-11-06
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

Note

The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token.

Additionally, add the dependencies as below. Since Akka gRPC uses Akka Discovery internally. Make sure to add Akka Discovery with the same Akka version that the application uses.

sbt
val AkkaVersion = "2.10.11"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub-grpc" % "10.0.1",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-discovery" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.11</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-google-cloud-pub-sub-grpc_${scala.binary.version}</artifactId>
    <version>10.0.1</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-discovery_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.11",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-google-cloud-pub-sub-grpc_${versions.ScalaBinary}:10.0.1"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-discovery_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.google.authgoogle-auth-library-oauth2-http1.39.1
com.google.protobufprotobuf-java3.25.8
com.lightbend.akka.grpcakka-grpc-runtime_2.132.5.8
com.lightbend.akkaakka-stream-alpakka-google-common_2.1310.0.1
com.thesamet.scalapbscalapb-runtime_2.130.11.18
com.typesafe.akkaakka-discovery_2.132.10.11
com.typesafe.akkaakka-stream_2.132.10.11
io.grpcgrpc-auth1.76.0
io.grpcgrpc-stub1.76.0
org.scala-langscala-library2.13.17
Dependency tree
com.google.auth    google-auth-library-oauth2-http    1.39.1    BSD New license
    com.google.api    api-common    2.53.0    BSD-3-Clause
        com.google.auto.value    auto-value-annotations    1.11.0    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
        javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
    com.google.auth    google-auth-library-credentials    1.39.1    BSD New license
    com.google.auto.value    auto-value-annotations    1.11.0    Apache 2.0
    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
    com.google.code.gson    gson    2.12.1    Apache-2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
    com.google.http-client    google-http-client-gson    1.47.0    The Apache Software License, Version 2.0
        com.google.code.gson    gson    2.12.1    Apache-2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.http-client    google-http-client    1.47.0    The Apache Software License, Version 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            io.grpc    grpc-context    1.76.0    Apache 2.0
                io.grpc    grpc-api    1.76.0    Apache 2.0
                    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.opencensus    opencensus-api    0.31.1    The Apache License, Version 2.0
                io.grpc    grpc-context    1.76.0    Apache 2.0
                    io.grpc    grpc-api    1.76.0    Apache 2.0
                        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.opencensus    opencensus-contrib-http-util    0.31.1    The Apache License, Version 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
                io.opencensus    opencensus-api    0.31.1    The Apache License, Version 2.0
                    io.grpc    grpc-context    1.76.0    Apache 2.0
                        io.grpc    grpc-api    1.76.0    Apache 2.0
                            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            org.apache.httpcomponents    httpclient    4.5.14    Apache License, Version 2.0
                commons-codec    commons-codec    1.11    Apache License, Version 2.0
                org.apache.httpcomponents    httpcore    4.4.16    Apache License, Version 2.0
            org.apache.httpcomponents    httpcore    4.4.16    Apache License, Version 2.0
    com.google.http-client    google-http-client    1.47.0    The Apache Software License, Version 2.0
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
        io.grpc    grpc-context    1.76.0    Apache 2.0
            io.grpc    grpc-api    1.76.0    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.opencensus    opencensus-api    0.31.1    The Apache License, Version 2.0
            io.grpc    grpc-context    1.76.0    Apache 2.0
                io.grpc    grpc-api    1.76.0    Apache 2.0
                    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.opencensus    opencensus-contrib-http-util    0.31.1    The Apache License, Version 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.opencensus    opencensus-api    0.31.1    The Apache License, Version 2.0
                io.grpc    grpc-context    1.76.0    Apache 2.0
                    io.grpc    grpc-api    1.76.0    Apache 2.0
                        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        org.apache.httpcomponents    httpclient    4.5.14    Apache License, Version 2.0
            commons-codec    commons-codec    1.11    Apache License, Version 2.0
            org.apache.httpcomponents    httpcore    4.4.16    Apache License, Version 2.0
        org.apache.httpcomponents    httpcore    4.4.16    Apache License, Version 2.0
com.google.protobuf    protobuf-java    3.25.8    BSD-3-Clause
com.lightbend.akka.grpc    akka-grpc-runtime_2.13    2.5.8    BUSL-1.1
    com.google.protobuf    protobuf-java    3.25.8    BSD-3-Clause
    com.thesamet.scalapb    scalapb-runtime_2.13    0.11.18    Apache 2
        com.google.protobuf    protobuf-java    3.25.8    BSD-3-Clause
        com.thesamet.scalapb    lenses_2.13    0.11.18    Apache 2
            org.scala-lang.modules    scala-collection-compat_2.13    2.12.0    Apache-2.0
                org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang.modules    scala-collection-compat_2.13    2.12.0    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-discovery_2.13    2.10.11    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
            com.typesafe    config    1.4.5    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-http-core_2.13    10.7.3    BUSL-1.1
        com.typesafe.akka    akka-parsing_2.13    10.7.3    BUSL-1.1
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-http_2.13    10.7.3    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.7.3    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.7.3    BUSL-1.1
                org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.10.11    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
                com.typesafe    config    1.4.5    Apache-2.0
                org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.slf4j    slf4j-api    2.0.17    MIT
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-pki_2.13    2.10.11    BUSL-1.1
        com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
            com.typesafe    config    1.4.5    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.slf4j    slf4j-api    2.0.17    MIT
    com.typesafe.akka    akka-stream_2.13    2.10.11    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
            com.typesafe    config    1.4.5    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.11    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    io.grpc    grpc-core    1.76.0    Apache 2.0
        com.google.android    annotations    4.1.1.4    Apache 2.0
        com.google.code.gson    gson    2.12.1    Apache-2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.grpc    grpc-api    1.76.0    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.grpc    grpc-context    1.76.0    Apache 2.0
            io.grpc    grpc-api    1.76.0    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.perfmark    perfmark-api    0.27.0    Apache 2.0
        org.codehaus.mojo    animal-sniffer-annotations    1.24    MIT license
    io.grpc    grpc-netty-shaded    1.76.0    Apache 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.grpc    grpc-api    1.76.0    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.grpc    grpc-core    1.76.0    Apache 2.0
            com.google.android    annotations    4.1.1.4    Apache 2.0
            com.google.code.gson    gson    2.12.1    Apache-2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.grpc    grpc-api    1.76.0    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.grpc    grpc-context    1.76.0    Apache 2.0
                io.grpc    grpc-api    1.76.0    Apache 2.0
                    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.perfmark    perfmark-api    0.27.0    Apache 2.0
            org.codehaus.mojo    animal-sniffer-annotations    1.24    MIT license
        io.grpc    grpc-util    1.76.0    Apache 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.grpc    grpc-api    1.76.0    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.grpc    grpc-core    1.76.0    Apache 2.0
                com.google.android    annotations    4.1.1.4    Apache 2.0
                com.google.code.gson    gson    2.12.1    Apache-2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
                io.grpc    grpc-api    1.76.0    Apache 2.0
                    com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
                io.grpc    grpc-context    1.76.0    Apache 2.0
                    io.grpc    grpc-api    1.76.0    Apache 2.0
                        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
                io.perfmark    perfmark-api    0.27.0    Apache 2.0
                org.codehaus.mojo    animal-sniffer-annotations    1.24    MIT license
            org.codehaus.mojo    animal-sniffer-annotations    1.24    MIT license
        io.perfmark    perfmark-api    0.27.0    Apache 2.0
        org.codehaus.mojo    animal-sniffer-annotations    1.24    MIT license
    io.grpc    grpc-protobuf    1.76.0    Apache 2.0
        com.google.api.grpc    proto-google-common-protos    2.59.2    Apache-2.0
            com.google.protobuf    protobuf-java    3.25.8    BSD-3-Clause
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        com.google.protobuf    protobuf-java    3.25.8    BSD-3-Clause
        io.grpc    grpc-api    1.76.0    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
        io.grpc    grpc-protobuf-lite    1.76.0    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
            com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
            io.grpc    grpc-api    1.76.0    Apache 2.0
                com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
                    com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
                    com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
                    com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
                    org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
com.lightbend.akka    akka-stream-alpakka-google-common_2.13    10.0.1
    com.github.jwt-scala    jwt-json-common_2.13    9.4.6    Apache-2.0
        com.github.jwt-scala    jwt-core_2.13    9.4.6    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.google.auth    google-auth-library-credentials    1.39.1    BSD New license
    com.typesafe.akka    akka-http-spray-json_2.13    10.7.3    BUSL-1.1
        com.typesafe.akka    akka-http_2.13    10.7.3    BUSL-1.1
            com.typesafe.akka    akka-http-core_2.13    10.7.3    BUSL-1.1
                com.typesafe.akka    akka-parsing_2.13    10.7.3    BUSL-1.1
                    org.scala-lang    scala-library    2.13.17    Apache-2.0
                org.scala-lang    scala-library    2.13.17    Apache-2.0
            com.typesafe.akka    akka-pki_2.13    2.10.11    BUSL-1.1
                com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
                com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
                    com.typesafe    config    1.4.5    Apache-2.0
                    org.scala-lang    scala-library    2.13.17    Apache-2.0
                org.scala-lang    scala-library    2.13.17    Apache-2.0
                org.slf4j    slf4j-api    2.0.17    MIT
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        io.spray    spray-json_2.13    1.3.6    Apache 2
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-http_2.13    10.7.3    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.7.3    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.7.3    BUSL-1.1
                org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.10.11    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
                com.typesafe    config    1.4.5    Apache-2.0
                org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
            org.slf4j    slf4j-api    2.0.17    MIT
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-stream_2.13    2.10.11    BUSL-1.1
        com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
            com.typesafe    config    1.4.5    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.13    2.10.11    BUSL-1.1
        org.reactivestreams    reactive-streams    1.0.4    MIT-0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
com.thesamet.scalapb    scalapb-runtime_2.13    0.11.18    Apache 2
    com.google.protobuf    protobuf-java    3.25.8    BSD-3-Clause
    com.thesamet.scalapb    lenses_2.13    0.11.18    Apache 2
        org.scala-lang.modules    scala-collection-compat_2.13    2.12.0    Apache-2.0
            org.scala-lang    scala-library    2.13.17    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    org.scala-lang.modules    scala-collection-compat_2.13    2.12.0    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
com.typesafe.akka    akka-discovery_2.13    2.10.11    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
        com.typesafe    config    1.4.5    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
com.typesafe.akka    akka-stream_2.13    2.10.11    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
        com.typesafe    config    1.4.5    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.11    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
io.grpc    grpc-auth    1.76.0    Apache 2.0
    com.google.auth    google-auth-library-credentials    1.39.1    BSD New license
    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
    io.grpc    grpc-api    1.76.0    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
io.grpc    grpc-stub    1.76.0    Apache 2.0
    com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
    com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
        com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
        com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
        org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
    io.grpc    grpc-api    1.76.0    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.2    The Apache Software License, Version 2.0
        com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
        com.google.guava    guava    33.4.8-android    Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.38.0    Apache 2.0
            com.google.guava    failureaccess    1.0.3    Apache License, Version 2.0
            com.google.guava    listenablefuture    9999.0-empty-to-avoid-conflict-with-guava    The Apache Software License, Version 2.0
            com.google.j2objc    j2objc-annotations    3.0.0    Apache License, Version 2.0
            org.jspecify    jspecify    1.0.0    The Apache License, Version 2.0
    org.codehaus.mojo    animal-sniffer-annotations    1.24    MIT license
org.scala-lang    scala-library    2.13.17    Apache-2.0

Binary compatibility

Warning

This connector contains code generated from Protobuf files which is bound to Akka gRPC 2.5. This makes it NOT binary-compatible with later versions of Akka gRPC. You can not use a different version of Akka gRPC within the same JVM instance.

Build setup

The Alpakka Google Cloud Pub/Sub gRPC library contains the classes generated from Google’s protobuf specification.

Configuration

The Pub/Sub gRPC connector shares its basic configuration with all the Google connectors in Alpakka. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf.

The defaults can be changed (for example when testing against the emulator) by tweaking the reference configuration:

reference.conf
sourcealpakka.google.credentials.default-scopes = ${?alpakka.google.credentials.default-scopes} ["https://www.googleapis.com/auth/pubsub"]

alpakka.google.cloud.pubsub.grpc {
  host = "pubsub.googleapis.com"
  port = 443
  # Set to "false" to disable TLS
  use-tls = true

  # Set to "none" to use the system default CA
  rootCa = "none"

  # Deprecated, use config path alpakka.google.credentials.provider
  callCredentials = deprecated
}
Test Configuration
sourceakka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
  loglevel = "DEBUG"
}

alpakka.google.cloud.pubsub.grpc {
  # To run the IntegrationSpec against Google Cloud:
  # * go to the console at https://console.cloud.google.com
  # * Create a compute engine service account as documented at https://cloud.google.com/docs/authentication/production#creating_a_service_account
  # * Point GOOGLE_APPLICATION_CREDENTIALS to the downloaded JSON key and start sbt
  # * Create a project, and update IntegrationSpec to use that project ID rather than "alpakka"
  # * Under 'Pub/Sub', 'Topics' create a topic 'simpleTopic' with a Google-managed key
  # * Under 'Pub/Sub', 'Subscriptions' create a subscription 'simpleSubscription' for this topic
  # * For 'republish', also create 'testTopic' and 'testSubscription'
  # * Comment out these test settings:

  host = "localhost"
  port = 8538
  use-tls = false # no TLS
  rootCa = "none"
  callCredentials = "none" # no authentication
}

For more configuration details consider the underlying configuration for Akka gRPC.

A manually initialized GrpcPublisherGrpcPublisher or GrpcSubscriberGrpcSubscriber can be used by providing it as an attribute to the stream:

Scala
sourceval settings = PubSubSettings(system)
val publisher = GrpcPublisher(settings)

val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] =
  GooglePubSub
    .publish(parallelism = 1)
    .withAttributes(PubSubAttributes.publisher(publisher))
Java
sourcefinal PubSubSettings settings = PubSubSettings.create(system);
final GrpcPublisher publisher = GrpcPublisher.create(settings, system);

final Flow<PublishRequest, PublishResponse, NotUsed> publishFlow =
    GooglePubSub.publish(1).withAttributes(PubSubAttributes.publisher(publisher));

Publishing

We first construct a message and then a request using Google’s builders. We declare a singleton source which will go via our publishing flow. All messages sent to the flow are published to PubSub.

Scala
sourceimport akka.NotUsed
import akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl.GooglePubSub
import akka.stream.scaladsl._

import com.google.protobuf.ByteString
import com.google.pubsub.v1.pubsub._

import scala.concurrent.Future

val projectId = "alpakka"
val topic = "simpleTopic"

val publishMessage: PubsubMessage =
  PubsubMessage()
    .withData(ByteString.copyFromUtf8("Hello world!"))

val publishRequest: PublishRequest =
  PublishRequest()
    .withTopic(s"projects/$projectId/topics/$topic")
    .addMessages(publishMessage)

val source: Source[PublishRequest, NotUsed] =
  Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] =
  GooglePubSub.publish(parallelism = 1)

val publishedMessageIds: Future[Seq[PublishResponse]] = source.via(publishFlow).runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.googlecloud.pubsub.grpc.PubSubSettings;
import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.GooglePubSub;
import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.GrpcPublisher;
import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.PubSubAttributes;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.*;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.*;

final String projectId = "alpakka";
final String topic = "simpleTopic";

final PubsubMessage publishMessage =
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build();

final PublishRequest publishRequest =
    PublishRequest.newBuilder()
        .setTopic("projects/" + projectId + "/topics/" + topic)
        .addMessages(publishMessage)
        .build();

final Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

final Flow<PublishRequest, PublishResponse, NotUsed> publishFlow = GooglePubSub.publish(1);

final CompletionStage<List<PublishResponse>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), system);

Similarly to before, we can publish a batch of messages for greater efficiency.

Scala
sourceval projectId = "alpakka"
val topic = "simpleTopic"

val publishMessage: PubsubMessage =
  PubsubMessage()
    .withData(ByteString.copyFromUtf8("Hello world!"))

val messageSource: Source[PubsubMessage, NotUsed] = Source(List(publishMessage, publishMessage))
val published = messageSource
  .groupedWithin(1000, 1.minute)
  .map { msgs =>
    PublishRequest()
      .withTopic(s"projects/$projectId/topics/$topic")
      .addAllMessages(msgs)
  }
  .via(GooglePubSub.publish(parallelism = 1))
  .runWith(Sink.seq)
Java
sourcefinal String projectId = "alpakka";
final String topic = "simpleTopic";

final PubsubMessage publishMessage =
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build();

final Source<PubsubMessage, NotUsed> messageSource = Source.single(publishMessage);
final CompletionStage<List<PublishResponse>> published =
    messageSource
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(
            messages ->
                PublishRequest.newBuilder()
                    .setTopic("projects/" + projectId + "/topics/" + topic)
                    .addAllMessages(messages)
                    .build())
        .via(GooglePubSub.publish(1))
        .runWith(Sink.seq(), system);

Subscribing

To receive messages from a subscription, there are two options: StreamingPullRequests or synchronous PullRequests. To decide whether you should use StreamingPullRequest or PullRequest, see StreamingPull: Dealing with large backlogs of small messages and Synchronous Pull from Google Cloud PubSub’s documentation

StreamingPullRequest

To receive message from a subscription, first we create a StreamingPullRequest with a FQRS of a subscription and a deadline for acknowledgements in seconds. Google requires that only the first StreamingPullRequest has the subscription and the deadline set. This connector takes care of that and clears up the subscription FQRS and the deadline for subsequent StreamingPullRequest messages.

Scala
sourceval projectId = "alpakka"
val subscription = "simpleSubscription"

val request = StreamingPullRequest()
  .withSubscription(s"projects/$projectId/subscriptions/$subscription")
  .withStreamAckDeadlineSeconds(10)

val subscriptionSource: Source[ReceivedMessage, Future[Cancellable]] =
  GooglePubSub.subscribe(request, pollInterval = 1.second)
Java
sourcefinal String projectId = "alpakka";
final String subscription = "simpleSubscription";

final StreamingPullRequest request =
    StreamingPullRequest.newBuilder()
        .setSubscription("projects/" + projectId + "/subscriptions/" + subscription)
        .setStreamAckDeadlineSeconds(10)
        .build();

final Duration pollInterval = Duration.ofSeconds(1);
final Source<ReceivedMessage, CompletableFuture<Cancellable>> subscriptionSource =
    GooglePubSub.subscribe(request, pollInterval);

Here pollInterval is the time between StreamingPullRequests are sent when there are no messages in the subscription.

PullRequest

With PullRequest, each request receives a batch of messages, up to a maximum specified by the maxMessages.

Scala
sourceval projectId = "alpakka"
val subscription = "simpleSubscription"

val request = PullRequest()
  .withSubscription(s"projects/$projectId/subscriptions/$subscription")
  .withMaxMessages(10)

val subscriptionSource: Source[ReceivedMessage, Future[Cancellable]] =
  GooglePubSub.subscribePolling(request, pollInterval = 1.second)
Java
sourcefinal String projectId = "alpakka";
final String subscription = "simpleSubscription";

final PullRequest request =
    PullRequest.newBuilder()
        .setSubscription("projects/" + projectId + "/subscriptions/" + subscription)
        .setMaxMessages(10)
        .build();

final Duration pollInterval = Duration.ofSeconds(1);
final Source<ReceivedMessage, CompletableFuture<Cancellable>> subscriptionSource =
    GooglePubSub.subscribePolling(request, pollInterval);

Here pollInterval is the time between PullRequest messages.

In order to minimise latency between requests you can set a buffer on the source. The buffer size depends on the usual number of messages you receive per each request, if you usually receive the maximum number of messages, it’s a good idea to set the buffer size to be the same as the maxMessages parameter. Please note that by having a buffer, you are allowing messages to spend some of their lease time in the buffer, hence reducing the time to process them before the acknowledgement deadline is reached. This will depend on your acknowledgement deadline and processing time.

Acknowledge

Messages received from the subscription need to be acknowledged or they will be sent again. To do that create AcknowledgeRequest that contains ackIds of the messages to be acknowledged and send them to a sink created by GooglePubSub.acknowledge.

Scala
sourceval ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(parallelism = 1)

subscriptionSource
  .map { message =>
    // do something fun
    message.ackId
  }
  .groupedWithin(10, 1.second)
  .map(
    ids =>
      AcknowledgeRequest()
        .withSubscription(
          s"projects/$projectId/subscriptions/$subscription"
        )
        .withAckIds(ids)
  )
  .to(ackSink)
Java
sourcefinal Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink = GooglePubSub.acknowledge(1);

subscriptionSource
    .map(
        receivedMessage -> {
          // do some computation
          return receivedMessage.getAckId();
        })
    .groupedWithin(10, Duration.ofSeconds(1))
    .map(acks -> AcknowledgeRequest.newBuilder().addAllAckIds(acks).build())
    .to(ackSink);

Running the test code

Note

Integration test code requires Google Cloud Pub/Sub emulator running in the background. You can start it quickly using docker:

docker compose up -d gcloud-pubsub-client

This will also run the Pub/Sub admin client that will create topics and subscriptions used by the integration tests.

Tests can be started from sbt by running:

sbt
> google-cloud-pub-sub-grpc/test

There is also an ExampleApp that can be used to test publishing to topics and receiving messages from subscriptions.

To run the example app you will need to configure a project and Pub/Sub in Google Cloud and provide your own credentials.

sbt
env GOOGLE_APPLICATION_CREDENTIALS=/path/to/application/credentials.json sbt

// receive messages from a subsciptions
> google-cloud-pub-sub-grpc/Test/run subscribe <project-id> <subscription-name>

// publish a single message to a topic
> google-cloud-pub-sub-grpc/Test/run publish-single <project-id> <topic-name>

// continually publish a message stream to a topic
> google-cloud-pub-sub-grpc/Test/run publish-stream <project-id> <topic-name>
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.