New to Akka? Start with the Akka SDK.
Azure Storage Queue
The Azure Storage Queue connector provides an Akka Stream Source and Sinks for Azure Storage Queue integration.
Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.
| Project Info: Alpakka Azure Storage Queue | |
|---|---|
| Artifact | com.lightbend.akka
akka-stream-alpakka-azure-storage-queue
10.0.1
|
| JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
| Scala versions | 2.13.17, 3.3.7 |
| JPMS module name | akka.stream.alpakka.azure.storagequeue |
| License | |
| Readiness level |
Since 0.9, 2017-05-24
|
| Home page | https://doc.akka.io/libraries/alpakka/current |
| API documentation | |
| Forums | |
| Release notes | GitHub releases |
| Issues | Github issues |
| Sources | https://github.com/akka/alpakka |
Artifacts
Note
The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.11" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage-queue" % "10.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )- Maven
<properties> <akka.version>2.10.11</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-azure-storage-queue_${scala.binary.version}</artifactId> <version>10.0.1</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>- Gradle
def versions = [ AkkaVersion: "2.10.11", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage-queue_${versions.ScalaBinary}:10.0.1" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.microsoft.azure azure-storage 8.6.6 com.typesafe.akka akka-stream_2.13 2.10.11 org.scala-lang scala-library 2.13.17 - Dependency tree
com.microsoft.azure azure-storage 8.6.6 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.9.4 The Apache Software License, Version 2.0 com.microsoft.azure azure-keyvault-core 1.2.4 The MIT License (MIT) com.google.guava guava 24.1.1-jre The Apache Software License, Version 2.0 com.google.code.findbugs jsr305 1.3.9 The Apache Software License, Version 2.0 com.google.errorprone error_prone_annotations 2.1.3 Apache 2.0 com.google.j2objc j2objc-annotations 1.1 The Apache Software License, Version 2.0 org.checkerframework checker-compat-qual 2.0.0 GNU General Public License, version 2 (GPL2), with the classpath exception org.codehaus.mojo animal-sniffer-annotations 1.14 MIT license org.apache.commons commons-lang3 3.8.1 Apache License, Version 2.0 org.apache.commons commons-lang3 3.8.1 Apache License, Version 2.0 org.slf4j slf4j-api 1.7.12 MIT License com.typesafe.akka akka-stream_2.13 2.10.11 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.11 BUSL-1.1 com.typesafe config 1.4.5 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.11 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.17 Apache-2.0 org.scala-lang scala-library 2.13.17 Apache-2.0
Init Azure Storage API
import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
val storageAccount = CloudStorageAccount.parse(storageConnectionString)
val queueClient = storageAccount.createCloudQueueClient
queueClient.getQueueReference("myQueue")
}
For more details, see Microsoft Azure Storage Docs.
Queuing a message
import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._
// Create an example message
val message = new CloudQueueMessage("Hello Azure")
Source.single(message).runWith(AzureQueueSink(queueFactory))
Processing and deleting messages
AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
println(msg.getMessageContentAsString) // Print the messages content
msg // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))