Google Cloud Pub/Sub
Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.
Further information at the official Google Cloud documentation website.
This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com). For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.
Project Info: Alpakka Google Cloud PubSub | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0-M5 |
JPMS module name | akka.stream.alpakka.google.cloud.pubsub |
License | |
Readiness level |
Since 0.7, 2017-03-31
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "1.0.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-google-cloud-pub-sub_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-google-cloud-pub-sub_2.12', version: '1.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.pauldijou jwt-core_2.12 2.1.0 Apache-2.0 com.typesafe.akka akka-http-spray-json_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-http_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.pauldijou jwt-core_2.12 2.1.0 Apache-2.0 org.bouncycastle bcpkix-jdk15on 1.60 Bouncy Castle Licence org.bouncycastle bcprov-jdk15on 1.60 Bouncy Castle Licence org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-http-spray-json_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-http_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.7 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause io.spray spray-json_2.12 1.3.5 Apache 2 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-http_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-http-core_2.12 10.1.7 Apache-2.0 com.typesafe.akka akka-parsing_2.12 10.1.7 Apache-2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause
Usage
Prepare your credentials for access to google cloud pub/sub.
- Scala
-
val privateKey = """-----BEGIN RSA PRIVATE KEY----- |MIIBOgIBAAJBAJHPYfmEpShPxAGP12oyPg0CiL1zmd2V84K5dgzhR9TFpkAp2kl2 |9BTc8jbAY0dQW4Zux+hyKxd6uANBKHOWacUCAwEAAQJAQVyXbMS7TGDFWnXieKZh |Dm/uYA6sEJqheB4u/wMVshjcQdHbi6Rr0kv7dCLbJz2v9bVmFu5i8aFnJy1MJOpA |2QIhAPyEAaVfDqJGjVfryZDCaxrsREmdKDlmIppFy78/d8DHAiEAk9JyTHcapckD |uSyaE6EaqKKfyRwSfUGO1VJXmPjPDRMCIF9N900SDnTiye/4FxBiwIfdynw6K3dW |fBLb6uVYr/r7AiBUu/p26IMm6y4uNGnxvJSqe+X6AxR6Jl043OWHs4AEbwIhANuz |Ay3MKOeoVbx0L+ruVRY5fkW+oLHbMGtQ9dZq7Dp9 |-----END RSA PRIVATE KEY-----""".stripMargin val clientEmail = "[email protected]" val projectId = "test-XXXXX" val apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE" val config = PubSubConfig(projectId, clientEmail, privateKey) val topic = "topic1" val subscription = "subscription1"
- Java
-
String privateKey = "-----BEGIN RSA PRIVATE KEY-----\n" + "MIIBOgIBAAJBAJHPYfmEpShPxAGP12oyPg0CiL1zmd2V84K5dgzhR9TFpkAp2kl2\n" + "9BTc8jbAY0dQW4Zux+hyKxd6uANBKHOWacUCAwEAAQJAQVyXbMS7TGDFWnXieKZh\n" + "Dm/uYA6sEJqheB4u/wMVshjcQdHbi6Rr0kv7dCLbJz2v9bVmFu5i8aFnJy1MJOpA\n" + "2QIhAPyEAaVfDqJGjVfryZDCaxrsREmdKDlmIppFy78/d8DHAiEAk9JyTHcapckD\n" + "uSyaE6EaqKKfyRwSfUGO1VJXmPjPDRMCIF9N900SDnTiye/4FxBiwIfdynw6K3dW\n" + "fBLb6uVYr/r7AiBUu/p26IMm6y4uNGnxvJSqe+X6AxR6Jl043OWHs4AEbwIhANuz\n" + "Ay3MKOeoVbx0L+ruVRY5fkW+oLHbMGtQ9dZq7Dp9\n" + "-----END RSA PRIVATE KEY-----"; String clientEmail = "[email protected]"; String projectId = "test-XXXXX"; String apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE"; PubSubConfig config = PubSubConfig.create(projectId, clientEmail, privateKey, system); String topic = "topic1"; String subscription = "subscription1";
And prepare the actor system and materializer.
- Scala
-
implicit val system = ActorSystem() implicit val mat = ActorMaterializer()
- Java
-
ActorSystem system = ActorSystem.create(); ActorMaterializer materializer = ActorMaterializer.create(system);
To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.
- Scala
-
val publishMessage = PubSubMessage(messageId = "1", data = new String(Base64.getEncoder.encode("Hello Google!".getBytes))) val publishRequest = PublishRequest(Seq(publishMessage)) val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest) val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] = GooglePubSub.publish(topic, config) val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
- Java
-
PubSubMessage publishMessage = PubSubMessage.create( "1", new String(Base64.getEncoder().encode("Hello Google!".getBytes()))); PublishRequest publishRequest = PublishRequest.of(Lists.newArrayList(publishMessage)); Source<PublishRequest, NotUsed> source = Source.single(publishRequest); Flow<PublishRequest, List<String>, NotUsed> publishFlow = GooglePubSub.publish(topic, config, 1, system, materializer); CompletionStage<List<List<String>>> publishedMessageIds = source.via(publishFlow).runWith(Sink.seq(), materializer);
To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.
- Scala
-
val messageSource: Source[PubSubMessage, NotUsed] = Source(List(publishMessage, publishMessage)) messageSource.groupedWithin(1000, 1.minute).map(PublishRequest.apply).via(publishFlow).to(Sink.seq)
- Java
-
Source<PubSubMessage, NotUsed> messageSource = Source.single(publishMessage); messageSource .groupedWithin(1000, Duration.ofMinutes(1)) .map(messages -> PublishRequest.of(messages)) .via(publishFlow) .runWith(Sink.ignore(), materializer);
To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest
- Scala
-
val subscriptionSource: Source[ReceivedMessage, NotUsed] = GooglePubSub.subscribe(subscription, config) val ackSink: Sink[AcknowledgeRequest, Future[Done]] = GooglePubSub.acknowledge(subscription, config) subscriptionSource .map { message => // do something fun message.ackId } .groupedWithin(1000, 1.minute) .map(AcknowledgeRequest.apply) .to(ackSink)
- Java
-
Source<ReceivedMessage, NotUsed> subscriptionSource = GooglePubSub.subscribe(subscription, config, system); Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink = GooglePubSub.acknowledge(subscription, config, 1, system, materializer); subscriptionSource .map( message -> { // do something fun return message.ackId(); }) .groupedWithin(1000, Duration.ofMinutes(1)) .map(acks -> AcknowledgeRequest.of(acks)) .to(ackSink);
If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.
- Scala
-
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = ??? val processMessage: Sink[ReceivedMessage, NotUsed] = ??? val batchAckSink = Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink) val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
- Java
-
Sink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink; Sink<ReceivedMessage, NotUsed> batchAckSink = Flow.of(ReceivedMessage.class) .map(t -> t.ackId()) .groupedWithin(1000, Duration.ofMinutes(1)) .map(ids -> AcknowledgeRequest.of(ids)) .to(ackSink); subscriptionSource.alsoTo(batchAckSink).to(processSink);
Running the examples
To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.