Google Cloud Pub/Sub

Note

Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com). For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.

Project Info: Alpakka Google Cloud PubSub
Artifact
com.lightbend.akka
akka-stream-alpakka-google-cloud-pub-sub
3.0.4
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.11, 2.13.3
JPMS module nameakka.stream.alpakka.google.cloud.pubsub
License
Readiness level
Since 0.7, 2017-03-31
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
val AkkaVersion = "2.6.14"
val AkkaHttpVersion = "10.1.11"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "3.0.4",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
)
Maven
<properties>
  <akka.version>2.6.14</akka.version>
  <akka.http.version>10.1.11</akka.http.version>
  <scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-google-cloud-pub-sub_${scala.binary.version}</artifactId>
    <version>3.0.4</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http-spray-json_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.6.14",
  AkkaHttpVersion: "10.1.11",
  ScalaBinary: "2.12"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-google-cloud-pub-sub_${versions.ScalaBinary}:3.0.4"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
  implementation "com.typesafe.akka:akka-http-spray-json_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.lightbend.akkaakka-stream-alpakka-google-common_2.123.0.4
com.typesafe.akkaakka-http-spray-json_2.1210.1.11
com.typesafe.akkaakka-http_2.1210.1.11
com.typesafe.akkaakka-stream_2.122.6.14
org.scala-langscala-library2.12.11
Dependency tree
com.lightbend.akka    akka-stream-alpakka-google-common_2.12    3.0.4
    com.github.jwt-scala    jwt-spray-json_2.12    7.1.0    Apache-2.0
        com.github.jwt-scala    jwt-json-common_2.12    7.1.0    Apache-2.0
            com.github.jwt-scala    jwt-core_2.12    7.1.0    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        io.spray    spray-json_2.12    1.3.6    Apache 2
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.google.auth    google-auth-library-credentials    0.24.1
    com.typesafe.akka    akka-http-spray-json_2.12    10.1.11    Apache-2.0
        com.typesafe.akka    akka-http_2.12    10.1.11    Apache-2.0
            com.typesafe.akka    akka-http-core_2.12    10.1.11    Apache-2.0
                com.typesafe.akka    akka-parsing_2.12    10.1.11    Apache-2.0
                    org.scala-lang    scala-library    2.12.11    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        io.spray    spray-json_2.12    1.3.6    Apache 2
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.typesafe.akka    akka-http_2.12    10.1.11    Apache-2.0
        com.typesafe.akka    akka-http-core_2.12    10.1.11    Apache-2.0
            com.typesafe.akka    akka-parsing_2.12    10.1.11    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.typesafe.akka    akka-stream_2.12    2.6.14    Apache-2.0
        com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        com.typesafe.akka    akka-protobuf-v3_2.12    2.6.14    Apache-2.0
        com.typesafe    ssl-config-core_2.12    0.4.2    Apache-2.0
            com.typesafe    config    1.4.0    Apache-2.0
            org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.reactivestreams    reactive-streams    1.0.3    CC0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
com.typesafe.akka    akka-http-spray-json_2.12    10.1.11    Apache-2.0
    com.typesafe.akka    akka-http_2.12    10.1.11    Apache-2.0
        com.typesafe.akka    akka-http-core_2.12    10.1.11    Apache-2.0
            com.typesafe.akka    akka-parsing_2.12    10.1.11    Apache-2.0
                org.scala-lang    scala-library    2.12.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    io.spray    spray-json_2.12    1.3.6    Apache 2
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
com.typesafe.akka    akka-http_2.12    10.1.11    Apache-2.0
    com.typesafe.akka    akka-http-core_2.12    10.1.11    Apache-2.0
        com.typesafe.akka    akka-parsing_2.12    10.1.11    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
com.typesafe.akka    akka-stream_2.12    2.6.14    Apache-2.0
    com.typesafe.akka    akka-actor_2.12    2.6.14    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.12    2.6.14    Apache-2.0
    com.typesafe    ssl-config-core_2.12    0.4.2    Apache-2.0
        com.typesafe    config    1.4.0    Apache-2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2    Apache-2.0
            org.scala-lang    scala-library    2.12.11    Apache-2.0
        org.scala-lang    scala-library    2.12.11    Apache-2.0
    org.reactivestreams    reactive-streams    1.0.3    CC0
    org.scala-lang    scala-library    2.12.11    Apache-2.0
org.scala-lang    scala-library    2.12.11    Apache-2.0

Usage

The Pub/Sub connector shares its basic configuration with all the Google connectors in Alpakka. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf.

And prepare the actor system.

Scala
sourceimplicit val system = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"
Java
sourceActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String topic = "topic1";
String subscription = "subscription1";

To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.

Scala
sourceval publishMessage =
  PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
Java
sourcePublishMessage publishMessage =
    PublishMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.create(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

Flow<PublishRequest, List<String>, NotUsed> publishFlow =
    GooglePubSub.publish(topic, config, 1);

CompletionStage<List<List<String>>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), system);

To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.

Scala
sourceval messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage))
messageSource
  .groupedWithin(1000, 1.minute)
  .map(grouped => PublishRequest(grouped))
  .via(publishFlow)
  .to(Sink.seq)
Java
sourceSource<PublishMessage, NotUsed> messageSource = Source.single(publishMessage);
messageSource
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(messages -> PublishRequest.create(messages))
    .via(publishFlow)
    .runWith(Sink.ignore(), system);

To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest

Scala
sourceval subscriptionSource: Source[ReceivedMessage, Cancellable] =
  GooglePubSub.subscribe(subscription, config)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(subscription, config)

subscriptionSource
  .map { message =>
    // do something fun

    message.ackId
  }
  .groupedWithin(1000, 1.minute)
  .map(AcknowledgeRequest.apply)
  .to(ackSink)
Java
sourceSource<ReceivedMessage, Cancellable> subscriptionSource =
    GooglePubSub.subscribe(subscription, config);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
    GooglePubSub.acknowledge(subscription, config);

subscriptionSource
    .map(
        message -> {
          // do something fun
          return message.ackId();
        })
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(acks -> AcknowledgeRequest.create(acks))
    .to(ackSink);

If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.

Scala
sourceval subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ???
val processMessage: Sink[ReceivedMessage, NotUsed] = // ???

val batchAckSink =
  Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)

val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
Java
sourceSink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;

Sink<ReceivedMessage, NotUsed> batchAckSink =
    Flow.of(ReceivedMessage.class)
        .map(t -> t.ackId())
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(ids -> AcknowledgeRequest.create(ids))
        .to(ackSink);

subscriptionSource.alsoTo(batchAckSink).to(processSink);

Running the examples

To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.