Google Cloud Pub/Sub gRPC

Note

Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via the gRPC protocol. The integration between Akka Stream and gRPC is handled by the Akka gRPC library. For a connector that uses HTTP for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub connector.

Project Info: Alpakka Google Cloud PubSub (gRPC)
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub-grpc
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.google.cloud.pubsub.grpc
License
Readiness level
Community-driven
Since 1.0-M1, 2018-11-06
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" % "akka-stream-alpakka-google-cloud-pub-sub-grpc_$scalaBinaryVersion$" % "$version$"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-google-cloud-pub-sub-grpc_$scalaBinaryVersion$</artifactId>
  <version>$version$</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-google-cloud-pub-sub-grpc_$scalaBinaryVersion$', version: '$version$'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.google.authgoogle-auth-library-oauth2-http0.10.0BSD New license
com.google.protobufprotobuf-java3.5.13-Clause BSD License
com.google.protobufprotobuf-java3.6.03-Clause BSD License
com.lightbend.akka.grpcakka-grpc-runtime_2.120.4Apache-2.0
com.thesamet.scalapbscalapb-runtime_2.120.8.0Apache 2
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
io.grpcgrpc-auth1.14.0Apache 2.0
io.grpcgrpc-stub1.15.0Apache 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.google.auth    google-auth-library-oauth2-http    0.10.0    BSD New license
    com.google.auth    google-auth-library-credentials    0.10.0    BSD New license
    com.google.guava    guava    20.0    The Apache Software License, Version 2.0
    com.google.http-client    google-http-client-jackson2    1.19.0    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.1.3    The Apache Software License, Version 2.0
        com.google.http-client    google-http-client    1.19.0    The Apache Software License, Version 2.0
            com.google.code.findbugs    jsr305    3.0.0    The Apache Software License, Version 2.0
            org.apache.httpcomponents    httpclient    4.0.1    Apache License
                commons-codec    commons-codec    1.3    The Apache Software License, Version 2.0
                commons-logging    commons-logging    1.1.1    The Apache Software License, Version 2.0
                org.apache.httpcomponents    httpcore    4.0.1    Apache License
    com.google.http-client    google-http-client    1.19.0    The Apache Software License, Version 2.0
        com.google.code.findbugs    jsr305    3.0.0    The Apache Software License, Version 2.0
        org.apache.httpcomponents    httpclient    4.0.1    Apache License
            commons-codec    commons-codec    1.3    The Apache Software License, Version 2.0
            commons-logging    commons-logging    1.1.1    The Apache Software License, Version 2.0
            org.apache.httpcomponents    httpcore    4.0.1    Apache License
com.google.protobuf    protobuf-java    3.6.0    3-Clause BSD License
com.lightbend.akka.grpc    akka-grpc-runtime_2.12    0.4    Apache-2.0
    com.lightbend.akka.discovery    akka-discovery_2.12    0.18.0    Apache-2.0
        com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.thesamet.scalapb    scalapb-runtime_2.12    0.8.0    Apache 2
        com.google.protobuf    protobuf-java    3.6.0    3-Clause BSD License
        com.lihaoyi    fastparse_2.12    1.0.0    MIT license
            com.lihaoyi    fastparse-utils_2.12    1.0.0    MIT license
                com.lihaoyi    sourcecode_2.12    0.1.4    MIT
                    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            com.lihaoyi    sourcecode_2.12    0.1.4    MIT
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        com.thesamet.scalapb    lenses_2.12    0.8.0    Apache 2
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-http-core_2.12    10.1.5    Apache-2.0
        com.typesafe.akka    akka-parsing_2.12    10.1.5    Apache-2.0
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-http2-support_2.12    10.1.5    Apache-2.0
        com.twitter    hpack    1.0.2    The Apache Software License, Version 2.0
        com.typesafe.akka    akka-http-core_2.12    10.1.5    Apache-2.0
            com.typesafe.akka    akka-parsing_2.12    10.1.5    Apache-2.0
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.eclipse.jetty.alpn    alpn-api    1.1.3.v20160715    Apache Software License - Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-http_2.12    10.1.5    Apache-2.0
        com.typesafe.akka    akka-http-core_2.12    10.1.5    Apache-2.0
            com.typesafe.akka    akka-parsing_2.12    10.1.5    Apache-2.0
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.reactivestreams    reactive-streams    1.0.2    CC0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    config    1.3.3    Apache License, Version 2.0
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    io.grpc    grpc-core    1.15.0    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.0    The Apache Software License, Version 2.0
        com.google.code.gson    gson    2.7    Apache 2.0
        com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        com.google.guava    guava    20.0    The Apache Software License, Version 2.0
        io.grpc    grpc-context    1.15.0    Apache 2.0
        io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        io.opencensus    opencensus-contrib-grpc-metrics    0.12.3    The Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
            io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        org.codehaus.mojo    animal-sniffer-annotations    1.17    MIT license
    io.grpc    grpc-netty-shaded    1.14.0    Apache 2.0
        io.grpc    grpc-core    1.15.0    Apache 2.0
            com.google.code.findbugs    jsr305    3.0.0    The Apache Software License, Version 2.0
            com.google.code.gson    gson    2.7    Apache 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
            com.google.guava    guava    20.0    The Apache Software License, Version 2.0
            io.grpc    grpc-context    1.15.0    Apache 2.0
            io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
            io.opencensus    opencensus-contrib-grpc-metrics    0.12.3    The Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
                io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
                    com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
            org.codehaus.mojo    animal-sniffer-annotations    1.17    MIT license
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.thesamet.scalapb    scalapb-runtime_2.12    0.8.0    Apache 2
    com.google.protobuf    protobuf-java    3.6.0    3-Clause BSD License
    com.lihaoyi    fastparse_2.12    1.0.0    MIT license
        com.lihaoyi    fastparse-utils_2.12    1.0.0    MIT license
            com.lihaoyi    sourcecode_2.12    0.1.4    MIT
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        com.lihaoyi    sourcecode_2.12    0.1.4    MIT
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.thesamet.scalapb    lenses_2.12    0.8.0    Apache 2
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
io.grpc    grpc-auth    1.14.0    Apache 2.0
    com.google.auth    google-auth-library-credentials    0.10.0    BSD New license
    io.grpc    grpc-core    1.15.0    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.0    The Apache Software License, Version 2.0
        com.google.code.gson    gson    2.7    Apache 2.0
        com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        com.google.guava    guava    20.0    The Apache Software License, Version 2.0
        io.grpc    grpc-context    1.15.0    Apache 2.0
        io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        io.opencensus    opencensus-contrib-grpc-metrics    0.12.3    The Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
            io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        org.codehaus.mojo    animal-sniffer-annotations    1.17    MIT license
io.grpc    grpc-stub    1.15.0    Apache 2.0
    io.grpc    grpc-core    1.15.0    Apache 2.0
        com.google.code.findbugs    jsr305    3.0.0    The Apache Software License, Version 2.0
        com.google.code.gson    gson    2.7    Apache 2.0
        com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        com.google.guava    guava    20.0    The Apache Software License, Version 2.0
        io.grpc    grpc-context    1.15.0    Apache 2.0
        io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        io.opencensus    opencensus-contrib-grpc-metrics    0.12.3    The Apache License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
            io.opencensus    opencensus-api    0.12.3    The Apache License, Version 2.0
                com.google.errorprone    error_prone_annotations    2.2.0    Apache 2.0
        org.codehaus.mojo    animal-sniffer-annotations    1.17    MIT license
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Configuration

The connector comes with the default settings configured to work with the Google Pub Sub endpoint and uses the default way of locating credentials by looking at the GOOGLE_APPLICATION_CREDENTIAL environment variable. Please check Google official documentation for more details on how to obtain credentials for your application.

The defaults can be changed (for example when testing against the emulator) by tweaking the reference configuration:

reference.conf
alpakka.google.cloud.pubsub.grpc {
  host = "pubsub.googleapis.com"
  port = 443

  # Set to "none" to disable TLS
  rootCa = "GoogleInternetAuthorityG3.crt"

  # Supported values:
  # * google-application-default
  # * none
  callCredentials = "google-application-default"
}
Test Configuration
alpakka.google.cloud.pubsub.grpc {
  host = "localhost"
  port = 8538
  rootCa = "none" # no tls
  callCredentials = "none" # no authentication
}

Prepare the actor system and materializer:

Scala
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer

implicit val system = ActorSystem("IntegrationSpec")
implicit val materializer = ActorMaterializer()
Java
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;

static final ActorSystem system = ActorSystem.create("IntegrationTest");
static final Materializer materializer = ActorMaterializer.create(system);

Publishing

We first construct a message and then a request using Google’s builders. We declare a singleton source which will go via our publishing flow. All messages sent to the flow are published to PubSub.

Scala
import akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl.GooglePubSub
import akka.stream.scaladsl._
import com.google.pubsub.v1.pubsub._

val projectId = "alpakka"
val topic = "simpleTopic"

val publishMessage: PubsubMessage =
  PubsubMessage()
    .withData(ByteString.copyFromUtf8("Hello world!"))

val publishRequest: PublishRequest =
  PublishRequest()
    .withTopic(s"projects/$projectId/topics/$topic")
    .addMessages(publishMessage)

val source: Source[PublishRequest, NotUsed] =
  Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] =
  GooglePubSub.publish(parallelism = 1)

val publishedMessageIds: Future[Seq[PublishResponse]] = source.via(publishFlow).runWith(Sink.seq)
Java
import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.GooglePubSub;
import akka.stream.javadsl.*;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.*;

final String projectId = "alpakka";
final String topic = "simpleTopic";

final PubsubMessage publishMessage =
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build();

final PublishRequest publishRequest =
    PublishRequest.newBuilder()
        .setTopic("projects/" + projectId + "/topics/" + topic)
        .addMessages(publishMessage)
        .build();

final Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

final Flow<PublishRequest, PublishResponse, NotUsed> publishFlow =
    GooglePubSub.publish(1, system);

final CompletionStage<List<PublishResponse>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), materializer);

Similarly to before, we can publish a batch of messages for greater efficiency.

Scala
val projectId = "alpakka"
val topic = "simpleTopic"

val publishMessage: PubsubMessage =
  PubsubMessage()
    .withData(ByteString.copyFromUtf8("Hello world!"))

val messageSource: Source[PubsubMessage, NotUsed] = Source(List(publishMessage, publishMessage))
val published = messageSource
  .groupedWithin(1000, 1.minute)
  .map { msgs =>
    PublishRequest()
      .withTopic(s"projects/$projectId/topics/$topic")
      .addAllMessages(msgs)
  }
  .via(GooglePubSub.publish(parallelism = 1))
  .runWith(Sink.seq)
Java
final String projectId = "alpakka";
final String topic = "simpleTopic";

final PubsubMessage publishMessage =
    PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build();

final Source<PubsubMessage, NotUsed> messageSource = Source.single(publishMessage);
final CompletionStage<List<PublishResponse>> published =
    messageSource
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(
            messages ->
                PublishRequest.newBuilder()
                    .setTopic("projects/" + projectId + "/topics/" + topic)
                    .addAllMessages(messages)
                    .build())
        .via(GooglePubSub.publish(1, system))
        .runWith(Sink.seq(), materializer);

Subscribing

To receive message from a subscription, first we create a StreamingPullRequest with a FQRS of a subscription and a deadline for acknowledgements in seconds. Google requires that only the first StreamingPullRequest has the subscription and the deadline set. This connector takes care of that and clears up the subscription FQRS and the deadline for subsequent StreamingPullRequest messages.

Scala
val projectId = "alpakka"
val subscription = "simpleSubscription"

val request = StreamingPullRequest()
  .withSubscription(s"projects/$projectId/subscriptions/$subscription")
  .withStreamAckDeadlineSeconds(10)

val subscriptionSource: Source[ReceivedMessage, Future[Cancellable]] =
  GooglePubSub.subscribe(request, pollInterval = 1.second)
Java
final String projectId = "alpakka";
final String subscription = "simpleSubscription";

final StreamingPullRequest request =
    StreamingPullRequest.newBuilder()
        .setSubscription("projects/" + projectId + "/subscriptions/" + subscription)
        .setStreamAckDeadlineSeconds(10)
        .build();

final Duration pollInterval = Duration.ofSeconds(1);
final Source<ReceivedMessage, CompletableFuture<Cancellable>> subscriptionSource =
    GooglePubSub.subscribe(request, pollInterval, system);

Here pollInterval is the time between StreamingPullRequests are sent when there are no messages in the subscription.

Messages received from the subscription need to be acknowledged or they will be sent again. To do that create AcknowledgeRequest that contains ackIds of the messages to be acknowledged and send them to a sink created by GooglePubSub.acknowledge.

Scala
val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(parallelism = 1)

subscriptionSource
  .map { message =>
    // do something fun
    message.ackId
  }
  .groupedWithin(10, 1.second)
  .map(ids => AcknowledgeRequest(ackIds = ids))
  .to(ackSink)
Java
final Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
    GooglePubSub.acknowledge(1, system);

subscriptionSource
    .map(
        receivedMessage -> {
          // do some computation
          return receivedMessage.getAckId();
        })
    .groupedWithin(10, Duration.ofSeconds(1))
    .map(acks -> AcknowledgeRequest.newBuilder().addAllAckIds(acks).build())
    .to(ackSink);

Running the test code

Note

Integration test code requires Google Cloud Pub/Sub emulator running in the background. You can start it quickly using docker:

docker-compose up -d gcloud-pubsub-client

This will also run the Pub/Sub admin client that will create topics and subscriptions used by the integration tests.

Tests can be started from sbt by running:

sbt
> google-cloud-pub-sub-grpc/test

There is also an ExampleApp that can be used to test publishing to topics and receiving messages from subscriptions.

To run the example app you will need to configure a project and Pub/Sub in Google Cloud and provide your own credentials.

sbt
env GOOGLE_APPLICATION_CREDENTIALS=/path/to/application/credentials.json sbt

// receive messages from a subsciptions
> google-cloud-pub-sub-grpc/Test/run subscribe <project-id> <subscription-name>

// publish a single message to a topic
> google-cloud-pub-sub-grpc/Test/run publish-single <project-id> <topic-name>

// continually publish a message stream to a topic
> google-cloud-pub-sub-grpc/Test/run publish-stream <project-id> <topic-name>
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.