Google Cloud Pub/SubNote
Google Cloud Pub/Sub provides many-to-many, asynchronous messaging that decouples senders and receivers.
Further information at the official Google Cloud documentation website .
This connector communicates to Pub/Sub via HTTP requests (i.e. https://pubsub.googleapis.com
). For a connector that uses gRPC for the communication, take a look at the alternative Alpakka Google Cloud Pub/Sub gRPC connector.
[+] Show project info
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
sbt resolvers += "Akka library repository" . at ( "https://repo.akka.io/maven" )
Maven <project>
...
<repositories>
<repository>
<id> akka-repository </id>
<name> Akka library repository </name>
<url> https://repo.akka.io/maven </url>
</repository>
</repositories>
</project>
Gradle repositories {
mavenCentral ()
maven {
url "https://repo.akka.io/maven"
}
}
Additionally, add the dependencies as below.
sbt val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq (
"com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub" % "9.0.1" ,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion ,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion ,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
)
Maven <properties>
<akka.version> 2.10.0 </akka.version>
<akka.http.version> 10.7.0 </akka.http.version>
<scala.binary.version> 2.13 </scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId> com.lightbend.akka </groupId>
<artifactId> akka-stream-alpakka-google-cloud-pub-sub_${scala.binary.version} </artifactId>
<version> 9.0.1 </version>
</dependency>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-stream_${scala.binary.version} </artifactId>
<version> ${akka.version} </version>
</dependency>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-http_${scala.binary.version} </artifactId>
<version> ${akka.http.version} </version>
</dependency>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-http-spray-json_${scala.binary.version} </artifactId>
<version> ${akka.http.version} </version>
</dependency>
</dependencies>
Gradle def versions = [
AkkaVersion : "2.10.0" ,
AkkaHttpVersion : "10.7.0" ,
ScalaBinary : "2.13"
]
dependencies {
implementation "com.lightbend.akka:akka-stream-alpakka-google-cloud-pub-sub_${versions.ScalaBinary}:9.0.1"
implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
implementation "com.typesafe.akka:akka-http-spray-json_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Direct dependencies
Organization Artifact Version
com.lightbend.akka akka-stream-alpakka-google-common_2.13 9.0.1
com.typesafe.akka akka-http-spray-json_2.13 10.7.0
com.typesafe.akka akka-http_2.13 10.7.0
com.typesafe.akka akka-stream_2.13 2.10.0
org.scala-lang scala-library 2.13.12
Dependency tree com.lightbend.akka akka-stream-alpakka-google-common_2.13 9.0.1
com.github.jwt-scala jwt-json-common_2.13 9.4.6 Apache-2.0
com.github.jwt-scala jwt-core_2.13 9.4.6 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.google.auth google-auth-library-credentials 1.24.1
com.typesafe.akka akka-http-spray-json_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
io.spray spray-json_2.13 1.3.6 Apache 2
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1
org.reactivestreams reactive-streams 1.0.4 MIT-0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-http-spray-json_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
io.spray spray-json_2.13 1.3.6 Apache 2
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1
org.reactivestreams reactive-streams 1.0.4 MIT-0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
Usage
The Pub/Sub connector shares its basic configuration with all the Google connectors in Alpakka. Additional Pub/Sub-specific configuration settings can be found in its own reference.conf .
And prepare the actor system.
Scala
copy source implicit val system : ActorSystem = ActorSystem ()
val config = PubSubConfig ()
val topic = "topic1"
val subscription = "subscription1"
Java
copy source ActorSystem system = ActorSystem . create ();
PubSubConfig config = PubSubConfig . create ();
String topic = "topic1" ;
String subscription = "subscription1" ;
To publish a single request, build the message with a base64 data payload and put it in a PublishRequest
. Publishing creates a flow taking the messages and returning the accepted message ids.
Scala
copy source val publishMessage =
PublishMessage ( new String ( Base64 . getEncoder . encode ( "Hello Google!" . getBytes )))
val publishRequest = PublishRequest ( Seq ( publishMessage ))
val source : Source [ PublishRequest , NotUsed ] = Source . single ( publishRequest )
val publishFlow : Flow [ PublishRequest , Seq [ String ], NotUsed ] =
GooglePubSub . publish ( topic , config )
val publishedMessageIds : Future [ Seq [ Seq [ String ]]] = source . via ( publishFlow ). runWith ( Sink . seq )
Java
copy source PublishMessage publishMessage =
PublishMessage . create ( new String ( Base64 . getEncoder (). encode ( "Hello Google!" . getBytes ())));
PublishRequest publishRequest = PublishRequest . create ( Lists . newArrayList ( publishMessage ));
Source < PublishRequest , NotUsed > source = Source . single ( publishRequest );
Flow < PublishRequest , List < String >, NotUsed > publishFlow =
GooglePubSub . publish ( topic , config , 1 );
CompletionStage < List < List < String >>> publishedMessageIds =
source . via ( publishFlow ). runWith ( Sink . seq (), system );
To get greater performance you can batch messages together, here we send batches with a maximum size of 1000 or at a maximum of 1 minute apart depending on the source.
Scala
copy source val messageSource : Source [ PublishMessage , NotUsed ] = Source ( List ( publishMessage , publishMessage ))
messageSource
. groupedWithin ( 1000 , 1.minute )
. map ( grouped => PublishRequest ( grouped ))
. via ( publishFlow )
. to ( Sink . seq )
Java
copy source Source < PublishMessage , NotUsed > messageSource = Source . single ( publishMessage );
messageSource
. groupedWithin ( 1000 , Duration . ofMinutes ( 1 ))
. map ( messages -> PublishRequest . create ( messages ))
. via ( publishFlow )
. runWith ( Sink . ignore (), system );
To consume the messages from a subscription you must subscribe then acknowledge the received messages. PublishRequest
Scala
copy source val subscriptionSource : Source [ ReceivedMessage , Cancellable ] =
GooglePubSub . subscribe ( subscription , config )
val ackSink : Sink [ AcknowledgeRequest , Future [ Done ]] =
GooglePubSub . acknowledge ( subscription , config )
subscriptionSource
. map { message =>
// do something fun
message . ackId
}
. groupedWithin ( 1000 , 1.minute )
. map ( AcknowledgeRequest . apply )
. to ( ackSink )
Java
copy source Source < ReceivedMessage , Cancellable > subscriptionSource =
GooglePubSub . subscribe ( subscription , config );
Sink < AcknowledgeRequest , CompletionStage < Done >> ackSink =
GooglePubSub . acknowledge ( subscription , config );
subscriptionSource
. map (
message -> {
// do something fun
return message . ackId ();
})
. groupedWithin ( 1000 , Duration . ofMinutes ( 1 ))
. map ( acks -> AcknowledgeRequest . create ( acks ))
. to ( ackSink );
If you want to automatically acknowledge the messages and send the ReceivedMessages to your own sink you can create a graph.
Scala
copy source val subscribeMessageSoruce : Source [ ReceivedMessage , NotUsed ] = // ???
val processMessage : Sink [ ReceivedMessage , NotUsed ] = // ???
val batchAckSink =
Flow [ ReceivedMessage ]. map ( _ . ackId ). groupedWithin ( 1000 , 1.minute ). map ( AcknowledgeRequest . apply ). to ( ackSink )
val q = subscribeMessageSoruce . alsoTo ( batchAckSink ). to ( processMessage )
Java
copy source Sink < ReceivedMessage , CompletionStage < Done >> processSink = yourProcessingSink ;
Sink < ReceivedMessage , NotUsed > batchAckSink =
Flow . of ( ReceivedMessage . class )
. map ( t -> t . ackId ())
. groupedWithin ( 1000 , Duration . ofMinutes ( 1 ))
. map ( ids -> AcknowledgeRequest . create ( ids ))
. to ( ackSink );
subscriptionSource . alsoTo ( batchAckSink ). to ( processSink );
Running the examples
To run the example code you will need to configure a project and pub/sub in google cloud and provide your own credentials.