Google Cloud Pub/Sub

Note

Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com). For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.

Project Info: Alpakka Google Cloud PubSub
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub
1.1.1
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.google.cloud.pubsub
License
Readiness level
Since 0.7, 2017-03-31
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "1.1.1"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-google-cloud-pub-sub_2.12</artifactId>
  <version>1.1.1</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-google-cloud-pub-sub_2.12', version: '1.1.1'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.pauldijoujwt-core_2.122.1.0Apache-2.0
com.typesafe.akkaakka-http-spray-json_2.1210.1.8Apache-2.0
com.typesafe.akkaakka-http_2.1210.1.8Apache-2.0
com.typesafe.akkaakka-stream_2.122.5.23Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.pauldijou    jwt-core_2.12    2.1.0    Apache-2.0
    org.bouncycastle    bcpkix-jdk15on    1.60    Bouncy Castle Licence
        org.bouncycastle    bcprov-jdk15on    1.60    Bouncy Castle Licence
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-http-spray-json_2.12    10.1.8    Apache-2.0
    com.typesafe.akka    akka-http_2.12    10.1.8    Apache-2.0
        com.typesafe.akka    akka-http-core_2.12    10.1.8    Apache-2.0
            com.typesafe.akka    akka-parsing_2.12    10.1.8    Apache-2.0
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    io.spray    spray-json_2.12    1.3.5    Apache 2
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-http_2.12    10.1.8    Apache-2.0
    com.typesafe.akka    akka-http-core_2.12    10.1.8    Apache-2.0
        com.typesafe.akka    akka-parsing_2.12    10.1.8    Apache-2.0
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Usage

Prepare your credentials for access to google cloud pub/sub.

Scala
val privateKey =
  """-----BEGIN RSA PRIVATE KEY-----
    |MIIBOgIBAAJBAJHPYfmEpShPxAGP12oyPg0CiL1zmd2V84K5dgzhR9TFpkAp2kl2
    |9BTc8jbAY0dQW4Zux+hyKxd6uANBKHOWacUCAwEAAQJAQVyXbMS7TGDFWnXieKZh
    |Dm/uYA6sEJqheB4u/wMVshjcQdHbi6Rr0kv7dCLbJz2v9bVmFu5i8aFnJy1MJOpA
    |2QIhAPyEAaVfDqJGjVfryZDCaxrsREmdKDlmIppFy78/d8DHAiEAk9JyTHcapckD
    |uSyaE6EaqKKfyRwSfUGO1VJXmPjPDRMCIF9N900SDnTiye/4FxBiwIfdynw6K3dW
    |fBLb6uVYr/r7AiBUu/p26IMm6y4uNGnxvJSqe+X6AxR6Jl043OWHs4AEbwIhANuz
    |Ay3MKOeoVbx0L+ruVRY5fkW+oLHbMGtQ9dZq7Dp9
    |-----END RSA PRIVATE KEY-----""".stripMargin
val clientEmail = "[email protected]"
val projectId = "test-XXXXX"
val apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE"

val config = PubSubConfig(projectId, clientEmail, privateKey)

val topic = "topic1"
val subscription = "subscription1"
Java
String privateKey =
    "-----BEGIN RSA PRIVATE KEY-----\n"
        + "MIIBOgIBAAJBAJHPYfmEpShPxAGP12oyPg0CiL1zmd2V84K5dgzhR9TFpkAp2kl2\n"
        + "9BTc8jbAY0dQW4Zux+hyKxd6uANBKHOWacUCAwEAAQJAQVyXbMS7TGDFWnXieKZh\n"
        + "Dm/uYA6sEJqheB4u/wMVshjcQdHbi6Rr0kv7dCLbJz2v9bVmFu5i8aFnJy1MJOpA\n"
        + "2QIhAPyEAaVfDqJGjVfryZDCaxrsREmdKDlmIppFy78/d8DHAiEAk9JyTHcapckD\n"
        + "uSyaE6EaqKKfyRwSfUGO1VJXmPjPDRMCIF9N900SDnTiye/4FxBiwIfdynw6K3dW\n"
        + "fBLb6uVYr/r7AiBUu/p26IMm6y4uNGnxvJSqe+X6AxR6Jl043OWHs4AEbwIhANuz\n"
        + "Ay3MKOeoVbx0L+ruVRY5fkW+oLHbMGtQ9dZq7Dp9\n"
        + "-----END RSA PRIVATE KEY-----";

String clientEmail = "[email protected]";
String projectId = "test-XXXXX";
String apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE";

PubSubConfig config = PubSubConfig.create(projectId, clientEmail, privateKey, system);

String topic = "topic1";
String subscription = "subscription1";

And prepare the actor system and materializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);

To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.

Scala
val publishMessage =
  PubSubMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
Java
PubSubMessage publishMessage =
    PubSubMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.of(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

Flow<PublishRequest, List<String>, NotUsed> publishFlow =
    GooglePubSub.publish(topic, config, 1, system, materializer);

CompletionStage<List<List<String>>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), materializer);

To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.

Scala
val messageSource: Source[PubSubMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource.groupedWithin(1000, 1.minute).map(PublishRequest.apply).via(publishFlow).to(Sink.seq)
Java
Source<PubSubMessage, NotUsed> messageSource = Source.single(publishMessage);
messageSource
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(messages -> PublishRequest.of(messages))
    .via(publishFlow)
    .runWith(Sink.ignore(), materializer);

To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest

Scala
val subscriptionSource: Source[ReceivedMessage, NotUsed] =
  GooglePubSub.subscribe(subscription, config)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(subscription, config)

subscriptionSource
  .map { message =>
    // do something fun

    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
Java
Source<ReceivedMessage, NotUsed> subscriptionSource =
    GooglePubSub.subscribe(subscription, config, system);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
    GooglePubSub.acknowledge(subscription, config, 1, system, materializer);

subscriptionSource
    .map(
        message -> {
          // do something fun
          return message.ackId();
        })
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(acks -> AcknowledgeRequest.of(acks))
    .to(ackSink);

If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.

Scala
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = ???
val processMessage: Sink[ReceivedMessage, NotUsed] = ???

val batchAckSink =
  Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)

val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
Java
Sink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;

Sink<ReceivedMessage, NotUsed> batchAckSink =
    Flow.of(ReceivedMessage.class)
        .map(t -> t.ackId())
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(ids -> AcknowledgeRequest.of(ids))
        .to(ackSink);

subscriptionSource.alsoTo(batchAckSink).to(processSink);

Running the examples

To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.