Google Cloud Pub/Sub gRPC
Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.
Further information at the official Google Cloud documentation website.
This connector communicates to Pub/Sub via the gRPC protocol. The integration between Akka Stream and gRPC is handled by the Akka gRPC library. For a connector that uses HTTP for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub connector.
Project Info: Alpakka Google Cloud PubSub (gRPC) | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub-grpc
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7 |
JPMS module name | akka.stream.alpakka.google.cloud.pubsub.grpc |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" % "akka-stream-alpakka-google-cloud-pub-sub-grpc_$scalaBinaryVersion$" % "$version$"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-google-cloud-pub-sub-grpc_$scalaBinaryVersion$</artifactId> <version>$version$</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-google-cloud-pub-sub-grpc_$scalaBinaryVersion$', version: '$version$' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.google.auth google-auth-library-oauth2-http 0.15.0 BSD New license com.google.protobuf protobuf-java 3.5.1 3-Clause BSD License com.google.protobuf protobuf-java 3.6.1 3-Clause BSD License com.lightbend.akka.grpc akka-grpc-runtime_2.12 0.6.1 Apache-2.0 com.thesamet.scalapb scalapb-runtime_2.12 0.8.4 Apache 2 com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 io.grpc grpc-auth 1.20.0 Apache 2.0 io.grpc grpc-stub 1.20.0 Apache 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.google.auth google-auth-library-oauth2-http 0.15.0 BSD New license com.google.auth google-auth-library-credentials 0.15.0 BSD New license com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license com.google.http-client google-http-client-jackson2 1.29.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.6 The Apache Software License, Version 2.0 com.google.http-client google-http-client 1.29.0 The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-contrib-http-util 0.18.0 The Apache License, Version 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 org.apache.httpcomponents httpclient 4.5.5 Apache License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 org.apache.httpcomponents httpcore 4.4.9 Apache License, Version 2.0 com.google.http-client google-http-client 1.29.0 The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-contrib-http-util 0.18.0 The Apache License, Version 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 org.apache.httpcomponents httpclient 4.5.5 Apache License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 commons-logging commons-logging 1.2 The Apache Software License, Version 2.0 org.apache.httpcomponents httpcore 4.4.9 Apache License, Version 2.0 com.google.protobuf protobuf-java 3.6.1 3-Clause BSD License com.lightbend.akka.grpc akka-grpc-runtime_2.12 0.6.1 Apache-2.0 com.thesamet.scalapb scalapb-runtime_2.12 0.8.4 Apache 2 com.google.protobuf protobuf-java 3.6.1 3-Clause BSD License com.lihaoyi fastparse_2.12 1.0.0 MIT license com.lihaoyi fastparse-utils_2.12 1.0.0 MIT license com.lihaoyi sourcecode_2.12 0.1.4 MIT org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.lihaoyi sourcecode_2.12 0.1.4 MIT org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.thesamet.scalapb lenses_2.12 0.8.4 Apache 2 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-discovery_2.12 2.5.20 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-http-core_2.12 10.1.8 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.8 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-http2-support_2.12 10.1.8 Apache-2.0 com.twitter hpack 1.0.2 The Apache Software License, Version 2.0 com.typesafe.akka akka-http-core_2.12 10.1.8 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.8 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.eclipse.jetty.alpn alpn-api 1.1.3.v20160715 Apache Software License - Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-http_2.12 10.1.8 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.1.8 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.8 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe config 1.3.3 Apache License, Version 2.0 com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause io.grpc grpc-core 1.20.0 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.code.gson gson 2.7 Apache 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.grpc grpc-context 1.20.0 Apache 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-contrib-grpc-metrics 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.grpc grpc-netty-shaded 1.20.0 Apache 2.0 io.grpc grpc-core 1.20.0 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.code.gson gson 2.7 Apache 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.grpc grpc-context 1.20.0 Apache 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-contrib-grpc-metrics 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license org.scala-lang scala-library 2.12.7 BSD 3-Clause com.thesamet.scalapb scalapb-runtime_2.12 0.8.4 Apache 2 com.google.protobuf protobuf-java 3.6.1 3-Clause BSD License com.lihaoyi fastparse_2.12 1.0.0 MIT license com.lihaoyi fastparse-utils_2.12 1.0.0 MIT license com.lihaoyi sourcecode_2.12 0.1.4 MIT org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.lihaoyi sourcecode_2.12 0.1.4 MIT org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.thesamet.scalapb lenses_2.12 0.8.4 Apache 2 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause io.grpc grpc-auth 1.20.0 Apache 2.0 com.google.auth google-auth-library-credentials 0.15.0 BSD New license io.grpc grpc-core 1.20.0 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.code.gson gson 2.7 Apache 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.grpc grpc-context 1.20.0 Apache 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-contrib-grpc-metrics 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.grpc grpc-stub 1.20.0 Apache 2.0 io.grpc grpc-core 1.20.0 Apache 2.0 com.google.android annotations 4.1.1.4 Apache 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.code.gson gson 2.7 Apache 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava guava 27.1-android The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 3.0.2 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.3.2 Apache 2.0 com.google.guava failureaccess 1.0.1 The Apache Software License, Version 2.0 com.google.guava listenablefuture 9999.0-empty-to-avoid-conflict-with-guava The Apache Software License, Version 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.5.2 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license io.grpc grpc-context 1.20.0 Apache 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-contrib-grpc-metrics 0.19.2 The Apache License, Version 2.0 io.opencensus opencensus-api 0.19.2 The Apache License, Version 2.0 org.codehaus.mojo animal-sniffer-annotations 1.17 MIT license org.scala-lang scala-library 2.12.7 BSD 3-Clause
Configuration
The connector comes with the default settings configured to work with the Google Pub Sub endpoint and uses the default way of locating credentials by looking at the GOOGLE_APPLICATION_CREDENTIAL
environment variable. Please check Google official documentation for more details on how to obtain credentials for your application.
The defaults can be changed (for example when testing against the emulator) by tweaking the reference configuration:
- reference.conf
-
alpakka.google.cloud.pubsub.grpc { host = "pubsub.googleapis.com" port = 443 # Set to "none" to disable TLS rootCa = "GoogleInternetAuthorityG3.crt" # Supported values: # * google-application-default # * none callCredentials = "google-application-default" }
- Test Configuration
-
alpakka.google.cloud.pubsub.grpc { host = "localhost" port = 8538 rootCa = "none" # no tls callCredentials = "none" # no authentication }
A manually initialized GrpcPublisherGrpcPublisher or GrpcSubscriberGrpcSubscriber can be used by providing it as an attribute to the stream:
- Scala
-
val settings = PubSubSettings(system) val publisher = GrpcPublisher(settings) val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] = GooglePubSub .publish(parallelism = 1) .withAttributes(PubSubAttributes.publisher(publisher))
- Java
-
final PubSubSettings settings = PubSubSettings.create(system); final GrpcPublisher publisher = GrpcPublisher.create(settings, system, materializer); final Flow<PublishRequest, PublishResponse, NotUsed> publishFlow = GooglePubSub.publish(1).withAttributes(PubSubAttributes.publisher(publisher));
Publishing
We first construct a message and then a request using Google’s builders. We declare a singleton source which will go via our publishing flow. All messages sent to the flow are published to PubSub.
- Scala
-
import akka.stream.alpakka.googlecloud.pubsub.grpc.scaladsl.GooglePubSub import akka.stream.scaladsl._ import com.google.pubsub.v1.pubsub._ val projectId = "alpakka" val topic = "simpleTopic" val publishMessage: PubsubMessage = PubsubMessage() .withData(ByteString.copyFromUtf8("Hello world!")) val publishRequest: PublishRequest = PublishRequest() .withTopic(s"projects/$projectId/topics/$topic") .addMessages(publishMessage) val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest) val publishFlow: Flow[PublishRequest, PublishResponse, NotUsed] = GooglePubSub.publish(parallelism = 1) val publishedMessageIds: Future[Seq[PublishResponse]] = source.via(publishFlow).runWith(Sink.seq)
- Java
-
import akka.stream.alpakka.googlecloud.pubsub.grpc.PubSubSettings; import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.GooglePubSub; import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.GrpcPublisher; import akka.stream.alpakka.googlecloud.pubsub.grpc.javadsl.PubSubAttributes; import akka.stream.javadsl.*; import com.google.protobuf.ByteString; import com.google.pubsub.v1.*; final String projectId = "alpakka"; final String topic = "simpleTopic"; final PubsubMessage publishMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build(); final PublishRequest publishRequest = PublishRequest.newBuilder() .setTopic("projects/" + projectId + "/topics/" + topic) .addMessages(publishMessage) .build(); final Source<PublishRequest, NotUsed> source = Source.single(publishRequest); final Flow<PublishRequest, PublishResponse, NotUsed> publishFlow = GooglePubSub.publish(1); final CompletionStage<List<PublishResponse>> publishedMessageIds = source.via(publishFlow).runWith(Sink.seq(), materializer);
Similarly to before, we can publish a batch of messages for greater efficiency.
- Scala
-
val projectId = "alpakka" val topic = "simpleTopic" val publishMessage: PubsubMessage = PubsubMessage() .withData(ByteString.copyFromUtf8("Hello world!")) val messageSource: Source[PubsubMessage, NotUsed] = Source(List(publishMessage, publishMessage)) val published = messageSource .groupedWithin(1000, 1.minute) .map { msgs => PublishRequest() .withTopic(s"projects/$projectId/topics/$topic") .addAllMessages(msgs) } .via(GooglePubSub.publish(parallelism = 1)) .runWith(Sink.seq)
- Java
-
final String projectId = "alpakka"; final String topic = "simpleTopic"; final PubsubMessage publishMessage = PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("Hello world!")).build(); final Source<PubsubMessage, NotUsed> messageSource = Source.single(publishMessage); final CompletionStage<List<PublishResponse>> published = messageSource .groupedWithin(1000, Duration.ofMinutes(1)) .map( messages -> PublishRequest.newBuilder() .setTopic("projects/" + projectId + "/topics/" + topic) .addAllMessages(messages) .build()) .via(GooglePubSub.publish(1)) .runWith(Sink.seq(), materializer);
Subscribing
To receive message from a subscription, first we create a StreamingPullRequest
with a FQRS of a subscription and a deadline for acknowledgements in seconds. Google requires that only the first StreamingPullRequest
has the subscription and the deadline set. This connector takes care of that and clears up the subscription FQRS and the deadline for subsequent StreamingPullRequest
messages.
- Scala
-
val projectId = "alpakka" val subscription = "simpleSubscription" val request = StreamingPullRequest() .withSubscription(s"projects/$projectId/subscriptions/$subscription") .withStreamAckDeadlineSeconds(10) val subscriptionSource: Source[ReceivedMessage, Future[Cancellable]] = GooglePubSub.subscribe(request, pollInterval = 1.second)
- Java
-
final String projectId = "alpakka"; final String subscription = "simpleSubscription"; final StreamingPullRequest request = StreamingPullRequest.newBuilder() .setSubscription("projects/" + projectId + "/subscriptions/" + subscription) .setStreamAckDeadlineSeconds(10) .build(); final Duration pollInterval = Duration.ofSeconds(1); final Source<ReceivedMessage, CompletableFuture<Cancellable>> subscriptionSource = GooglePubSub.subscribe(request, pollInterval);
Here pollInterval
is the time between StreamingPullRequest
s are sent when there are no messages in the subscription.
Messages received from the subscription need to be acknowledged or they will be sent again. To do that create AcknowledgeRequest
that contains ackId
s of the messages to be acknowledged and send them to a sink created by GooglePubSub.acknowledge
.
- Scala
-
val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(parallelism = 1) subscriptionSource .map { message => // do something fun message.ackId } .groupedWithin(10, 1.second) .map(ids => AcknowledgeRequest(ackIds = ids)) .to(ackSink)
- Java
-
final Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink = GooglePubSub.acknowledge(1); subscriptionSource .map( receivedMessage -> { // do some computation return receivedMessage.getAckId(); }) .groupedWithin(10, Duration.ofSeconds(1)) .map(acks -> AcknowledgeRequest.newBuilder().addAllAckIds(acks).build()) .to(ackSink);
Running the test code
Integration test code requires Google Cloud Pub/Sub emulator running in the background. You can start it quickly using docker:
docker-compose up -d gcloud-pubsub-client
This will also run the Pub/Sub admin client that will create topics and subscriptions used by the integration tests.
Tests can be started from sbt by running:
- sbt
-
> google-cloud-pub-sub-grpc/test
There is also an ExampleApp that can be used to test publishing to topics and receiving messages from subscriptions.
To run the example app you will need to configure a project and Pub/Sub in Google Cloud and provide your own credentials.
- sbt
-
env GOOGLE_APPLICATION_CREDENTIALS=/path/to/application/credentials.json sbt // receive messages from a subsciptions > google-cloud-pub-sub-grpc/Test/run subscribe <project-id> <subscription-name> // publish a single message to a topic > google-cloud-pub-sub-grpc/Test/run publish-single <project-id> <topic-name> // continually publish a message stream to a topic > google-cloud-pub-sub-grpc/Test/run publish-stream <project-id> <topic-name>