Google Cloud Pub/Sub


Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.

Further information at the official Google Cloud documentation website.

This connector communicates to Pub/Sub via HTTP requests (i.e. For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.

Project Info: Alpakka Google Cloud PubSub
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12
JPMS module
Readiness level
Since 0.7, 2017-03-31
Home page
API documentation
Release notesGitHub releases
IssuesGithub issues


The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

resolvers += "Akka library repository".at("")
      <name>Akka library repository</name>
repositories {
    maven {
        url ""

Additionally, add the dependencies as below.

val AkkaVersion = "2.10.0-M1"
val AkkaHttpVersion = "10.7.0-M1"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "9.0.0-M1+1-4049dca2-SNAPSHOT",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
def versions = [
  AkkaVersion: "2.10.0-M1",
  AkkaHttpVersion: "10.7.0-M1",
  ScalaBinary: "2.13"
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-google-cloud-pub-sub_${versions.ScalaBinary}:9.0.0-M1+1-4049dca2-SNAPSHOT"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
  implementation "com.typesafe.akka:akka-http-spray-json_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"

The Pub/Sub connector shares its basic configuration with all the Google connectors in Alpakka. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf.

And prepare the actor system.

sourceimplicit val system: ActorSystem = ActorSystem()
val config = PubSubConfig()
val topic = "topic1"
val subscription = "subscription1"
sourceActorSystem system = ActorSystem.create();
PubSubConfig config = PubSubConfig.create();
String topic = "topic1";
String subscription = "subscription1";

To publish a single request, build the message with a base64 data payload and put it in a PublishRequest. Publishing creates a flow taking the messages and returning the accepted message ids.

sourceval publishMessage =
  PublishMessage(new String(Base64.getEncoder.encode("Hello Google!".getBytes)))
val publishRequest = PublishRequest(Seq(publishMessage))

val source: Source[PublishRequest, NotUsed] = Source.single(publishRequest)

val publishFlow: Flow[PublishRequest, Seq[String], NotUsed] =
  GooglePubSub.publish(topic, config)

val publishedMessageIds: Future[Seq[Seq[String]]] = source.via(publishFlow).runWith(Sink.seq)
sourcePublishMessage publishMessage =
    PublishMessage.create(new String(Base64.getEncoder().encode("Hello Google!".getBytes())));
PublishRequest publishRequest = PublishRequest.create(Lists.newArrayList(publishMessage));

Source<PublishRequest, NotUsed> source = Source.single(publishRequest);

Flow<PublishRequest, List<String>, NotUsed> publishFlow =
    GooglePubSub.publish(topic, config, 1);

CompletionStage<List<List<String>>> publishedMessageIds =
    source.via(publishFlow).runWith(Sink.seq(), system);

To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.

sourceval messageSource: Source[PublishMessage, NotUsed] = Source(List(publishMessage, publishMessage))
  .groupedWithin(1000, 1.minute)
  .map(grouped => PublishRequest(grouped))
sourceSource<PublishMessage, NotUsed> messageSource = Source.single(publishMessage);
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(messages -> PublishRequest.create(messages))
    .runWith(Sink.ignore(), system);

To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest

sourceval subscriptionSource: Source[ReceivedMessage, Cancellable] =
  GooglePubSub.subscribe(subscription, config)

val ackSink: Sink[AcknowledgeRequest, Future[Done]] =
  GooglePubSub.acknowledge(subscription, config)

  .map { message =>
    // do something fun

  .groupedWithin(1000, 1.minute)
sourceSource<ReceivedMessage, Cancellable> subscriptionSource =
    GooglePubSub.subscribe(subscription, config);

Sink<AcknowledgeRequest, CompletionStage<Done>> ackSink =
    GooglePubSub.acknowledge(subscription, config);

        message -> {
          // do something fun
          return message.ackId();
    .groupedWithin(1000, Duration.ofMinutes(1))
    .map(acks -> AcknowledgeRequest.create(acks))

If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.

sourceval subscribeMessageSoruce: Source[ReceivedMessage, NotUsed] = // ???
val processMessage: Sink[ReceivedMessage, NotUsed] = // ???

val batchAckSink =
  Flow[ReceivedMessage].map(_.ackId).groupedWithin(1000, 1.minute).map(AcknowledgeRequest.apply).to(ackSink)

val q = subscribeMessageSoruce.alsoTo(batchAckSink).to(processMessage)
sourceSink<ReceivedMessage, CompletionStage<Done>> processSink = yourProcessingSink;

Sink<ReceivedMessage, NotUsed> batchAckSink =
        .map(t -> t.ackId())
        .groupedWithin(1000, Duration.ofMinutes(1))
        .map(ids -> AcknowledgeRequest.create(ids))


Running the examples

To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.

