Google Cloud Pub/Sub

Note

Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com). For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.

Project Info: Alpakka Google Cloud PubSub
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub
2.0.2
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.11, 2.11.12, 2.13.3
JPMS module nameakka.stream.alpakka.google.cloud.pubsub
License
Readiness level
Since 0.7, 2017-03-31
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
val AkkaVersion = "2.5.31"
val AkkaHttpVersion = "10.1.11"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "2.0.2",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
)
Maven
<properties>
  <akka.version>2.5.31</akka.version>
  <akka.http.version>10.1.11</akka.http.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-google-cloud-pub-sub_${scala.binary.version}</artifactId>
  <version>2.0.2</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream_${scala.binary.version}</artifactId>
  <version>${akka.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-http_${scala.binary.version}</artifactId>
  <version>${akka.http.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-http-spray-json_${scala.binary.version}</artifactId>
  <version>${akka.http.version}</version>
</dependency>
Gradle
versions += [
  AkkaVersion: "2.5.31",
  AkkaHttpVersion: "10.1.11",
  ScalaBinary: "2.12"
]
dependencies {
  compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-google-cloud-pub-sub_${versions.ScalaBinary}", version: '2.0.2',
  compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion,
  compile group: 'com.typesafe.akka', name: "akka-http_${versions.ScalaBinary}", version: versions.AkkaHttpVersion,
  compile group: 'com.typesafe.akka', name: "akka-http-spray-json_${versions.ScalaBinary}", version: versions.AkkaHttpVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.pauldijoujwt-core_2.123.0.1
com.typesafe.akkaakka-http-spray-json_2.1210.1.11
com.typesafe.akkaakka-http_2.1210.1.11
com.typesafe.akkaakka-stream_2.122.5.31
org.scala-langscala-library2.12.11
Dependency tree
com.pauldijou    jwt-core_2.12    3.0.1
    org.scala-lang    scala-library    2.12.11
com.typesafe.akka    akka-http-spray-json_2.12    10.1.11
    com.typesafe.akka    akka-http_2.12    10.1.11
        com.typesafe.akka    akka-http-core_2.12    10.1.11
            com.typesafe.akka    akka-parsing_2.12    10.1.11
                org.scala-lang    scala-library    2.12.11
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    io.spray    spray-json_2.12    1.3.5
        org.scala-lang    scala-library    2.12.11
    org.scala-lang    scala-library    2.12.11
com.typesafe.akka    akka-http_2.12    10.1.11
    com.typesafe.akka    akka-http-core_2.12    10.1.11
        com.typesafe.akka    akka-parsing_2.12    10.1.11
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    org.scala-lang    scala-library    2.12.11
com.typesafe.akka    akka-stream_2.12    2.5.31
    com.typesafe.akka    akka-actor_2.12    2.5.31
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    com.typesafe.akka    akka-protobuf_2.12    2.5.31
        org.scala-lang    scala-library    2.12.11
    com.typesafe    ssl-config-core_2.12    0.3.8
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2
            org.scala-lang    scala-library    2.12.11
        org.scala-lang    scala-library    2.12.11
    org.reactivestreams    reactive-streams    1.0.2
    org.scala-lang    scala-library    2.12.11
org.scala-lang    scala-library    2.12.11

Usage

Prepare your credentials for access to google cloud pub/sub.

Scala
val privateKey =
  """-----BEGIN RSA PRIVATE KEY-----
    |MIIBOgIBAAJBAJHPYfmEpShPxAGP12oyPg0CiL1zmd2V84K5dgzhR9TFpkAp2kl2
    |9BTc8jbAY0dQW4Zux+hyKxd6uANBKHOWacUCAwEAAQJAQVyXbMS7TGDFWnXieKZh
    |Dm/uYA6sEJqheB4u/wMVshjcQdHbi6Rr0kv7dCLbJz2v9bVmFu5i8aFnJy1MJOpA
    |2QIhAPyEAaVfDqJGjVfryZDCaxrsREmdKDlmIppFy78/d8DHAiEAk9JyTHcapckD
    |uSyaE6EaqKKfyRwSfUGO1VJXmPjPDRMCIF9N900SDnTiye/4FxBiwIfdynw6K3dW
    |fBLb6uVYr/r7AiBUu/p26IMm6y4uNGnxvJSqe+X6AxR6Jl043OWHs4AEbwIhANuz
    |Ay3MKOeoVbx0L+ruVRY5fkW+oLHbMGtQ9dZq7Dp9
    |-----END RSA PRIVATE KEY-----""".stripMargin
val clientEmail = "[email protected]"
val projectId = "test-XXXXX"
val apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE"

val config = PubSubConfig(projectId, clientEmail, privateKey)

val topic = "topic1"
val subscription = "subscription1"
Java
String privateKey =
    "-----BEGIN RSA PRIVATE KEY-----\n"
        + "MIIBOgIBAAJBAJHPYfmEpShPxAGP12oyPg0CiL1zmd2V84K5dgzhR9TFpkAp2kl2\n"
        + "9BTc8jbAY0dQW4Zux+hyKxd6uANBKHOWacUCAwEAAQJAQVyXbMS7TGDFWnXieKZh\n"
        + "Dm/uYA6sEJqheB4u/wMVshjcQdHbi6Rr0kv7dCLbJz2v9bVmFu5i8aFnJy1MJOpA\n"
        + "2QIhAPyEAaVfDqJGjVfryZDCaxrsREmdKDlmIppFy78/d8DHAiEAk9JyTHcapckD\n"
        + "uSyaE6EaqKKfyRwSfUGO1VJXmPjPDRMCIF9N900SDnTiye/4FxBiwIfdynw6K3dW\n"
        + "fBLb6uVYr/r7AiBUu/p26IMm6y4uNGnxvJSqe+X6AxR6Jl043OWHs4AEbwIhANuz\n"
        + "Ay3MKOeoVbx0L+ruVRY5fkW+oLHbMGtQ9dZq7Dp9\n"
        + "-----END RSA PRIVATE KEY-----";

String clientEmail = "[email protected]";
String projectId = "test-XXXXX";
String apiKey = "AIzaSyCVvqrlz057gCssc70n5JERyTW4TpB4ebE";

PubSubConfig config = PubSubConfig.create(projectId, clientEmail, privateKey, system);

String topic = "topic1";
String subscription = "subscription1";

And prepare the actor system and materializer.

Scala
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
Java
ActorSystem system = ActorSystem.create();
ActorMaterializer materializer = ActorMaterializer.create(system);

To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.

Scala
val publishMessage =
  PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
Java
PublishMessage publishMessage =
    PublishMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.create(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

Flow<PublishRequest, List<String>, NotUsed> publishFlow =
    GooglePubSub.publish(topic, config, 1);

CompletionStage<List<List<String>>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), materializer);

To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.

Scala
val messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource
  .groupedWithin(1000, 1.minute)
  .map(grouped => PublishRequest(grouped))
  .via(publishFlow)
  .to(Sink.seq)
Java
Source<PublishMessage, NotUsed> messageSource = Source.single(publishMessage);
messageSource
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(messages -> PublishRequest.create(messages))
    .via(publishFlow)
    .runWith(Sink.ignore(), materializer);

To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest

Scala
val subscriptionSource: Source[ReceivedMessage, Cancellable] =
  GooglePubSub.subscribe(subscription, config)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(subscription, config)

subscriptionSource
  .map { message =>
    // do something fun

    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
Java
Source<ReceivedMessage, Cancellable> subscriptionSource =
    GooglePubSub.subscribe(subscription, config);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
    GooglePubSub.acknowledge(subscription, config);

subscriptionSource
    .map(
        message -> {
          // do something fun
          return message.ackId();
        })
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(acks -> AcknowledgeRequest.create(acks))
    .to(ackSink);

If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.

Scala
val subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ???
val processMessage: Sink[ReceivedMessage, NotUsed] = // ???

val batchAckSink =
  Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)

val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
Java
Sink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;

Sink<ReceivedMessage, NotUsed> batchAckSink =
    Flow.of(ReceivedMessage.class)
        .map(t -> t.ackId())
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(ids -> AcknowledgeRequest.create(ids))
        .to(ackSink);

subscriptionSource.alsoTo(batchAckSink).to(processSink);

Running the examples

To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.