Google Cloud Pub/Sub
Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.
Further information at the official Google Cloud documentation website.
This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com). For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.
| Project Info: Alpakka Google Cloud PubSub | |
|---|---|
| Artifact | com.lightbend.akka 
  akka-stream-alpakka-google-cloud-pub-sub 
  9.0.1 
  
   | 
| JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17  | 
| Scala versions | 2.13.12 | 
| JPMS module name | akka.stream.alpakka.google.cloud.pubsub | 
| License | |
| Readiness level | 
   Since 0.7, 2017-03-31 
   | 
| Home page | https://doc.akka.io/libraries/alpakka/current | 
| API documentation | |
| Forums | |
| Release notes | GitHub releases | 
| Issues | Github issues | 
| Sources | https://github.com/akka/alpakka | 
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
 resolvers += "Akka library repository".at("https://repo.akka.io/maven")- Maven
 <project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>- Gradle
 repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
 val AkkaVersion = "2.10.0" val AkkaHttpVersion = "10.7.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "9.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion )- Maven
 <properties> <akka.version>2.10.0</akka.version> <akka.http.version>10.7.0</akka.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-google-cloud-pub-sub_${scala.binary.version}</artifactId> <version>9.0.1</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http-spray-json_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> </dependencies>- Gradle
 def versions = [ AkkaVersion: "2.10.0", AkkaHttpVersion: "10.7.0", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-google-cloud-pub-sub_${versions.ScalaBinary}:9.0.1" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}" implementation "com.typesafe.akka:akka-http-spray-json_${versions.ScalaBinary}:${versions.AkkaHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
 Organization Artifact Version com.lightbend.akka akka-stream-alpakka-google-common_2.13 9.0.1 com.typesafe.akka akka-http-spray-json_2.13 10.7.0 com.typesafe.akka akka-http_2.13 10.7.0 com.typesafe.akka akka-stream_2.13 2.10.0 org.scala-lang scala-library 2.13.12 - Dependency tree
 com.lightbend.akka akka-stream-alpakka-google-common_2.13 9.0.1 com.github.jwt-scala jwt-json-common_2.13 9.4.6 Apache-2.0 com.github.jwt-scala jwt-core_2.13 9.4.6 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.google.auth google-auth-library-credentials 1.24.1 com.typesafe.akka akka-http-spray-json_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 io.spray spray-json_2.13 1.3.6 Apache 2 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-http-spray-json_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 io.spray spray-json_2.13 1.3.6 Apache 2 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0
Usage
The Pub/Sub connector shares its basic configuration with all the Google connectors in Alpakka. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf.
And prepare the actor system.
- Scala
 - 
  
source
implicit val system: ActorSystem = ActorSystem() val config = PubSubConfig() val topic = "topic1" val subscription = "subscription1" - Java
 - 
  
source
ActorSystem system = ActorSystem.create(); PubSubConfig config = PubSubConfig.create(); String topic = "topic1"; String subscription = "subscription1"; 
To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.
- Scala
 - 
  
source
val publishMessage = PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val publishRequest = PublishRequest(Seq(publishMessage)) val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest) val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = GooglePubSub.publish(topic, config) val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq) - Java
 - 
  
source
PublishMessage publishMessage = PublishMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes()))); PublishRequest publishRequest = PublishRequest.create(Lists.newArrayList(publishMessage)); Source<PublishRequest, NotUsed> source = Source.single(publishRequest); Flow<PublishRequest, List<String>, NotUsed> publishFlow = GooglePubSub.publish(topic, config, 1); CompletionStage<List<List<String>>> publishedMessageIds = source.via(publishFlow).runWith(Sink.seq(), system); 
To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.
- Scala
 - 
  
source
val messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage)) messageSource .groupedWithin(1000, 1.minute) .map(grouped => PublishRequest(grouped)) .via(publishFlow) .to(Sink.seq) - Java
 - 
  
source
Source<PublishMessage, NotUsed> messageSource = Source.single(publishMessage); messageSource .groupedWithin(1000, Duration.ofMinutes(1)) .map(messages -> PublishRequest.create(messages)) .via(publishFlow) .runWith(Sink.ignore(), system); 
To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest
- Scala
 - 
  
source
val subscriptionSource: Source[ReceivedMessage, Cancellable] = GooglePubSub.subscribe(subscription, config) val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(subscription, config) subscriptionSource .map { message => // do something fun message.ackId } .groupedWithin(1000, 1.minute) .map(AcknowledgeRequest.apply) .to(ackSink) - Java
 - 
  
source
Source<ReceivedMessage, Cancellable> subscriptionSource = GooglePubSub.subscribe(subscription, config); Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink = GooglePubSub.acknowledge(subscription, config); subscriptionSource .map( message -> { // do something fun return message.ackId(); }) .groupedWithin(1000, Duration.ofMinutes(1)) .map(acks -> AcknowledgeRequest.create(acks)) .to(ackSink); 
If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.
- Scala
 - 
  
source
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ??? val processMessage: Sink[ReceivedMessage, NotUsed] = // ??? val batchAckSink = Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink) val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage) - Java
 - 
  
source
Sink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink; Sink<ReceivedMessage, NotUsed> batchAckSink = Flow.of(ReceivedMessage.class) .map(t -> t.ackId()) .groupedWithin(1000, Duration.ofMinutes(1)) .map(ids -> AcknowledgeRequest.create(ids)) .to(ackSink); subscriptionSource.alsoTo(batchAckSink).to(processSink); 
Running the examples
To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.