Azure Storage Queue
The Azure Storage Queue connector provides an Akka Stream Source and Sinks for Azure Storage Queue integration.
Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.
[+] Show project infoProject Info: Alpakka Azure Storage Queue | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-azure-storage-queue
9.0.1
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.4 |
JPMS module name | akka.stream.alpakka.azure.storagequeue |
License | |
Readiness level |
Since 0.9, 2017-05-24
|
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage-queue" % "9.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
- Gradle
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.microsoft.azure azure-storage 8.6.6 com.typesafe.akka akka-stream_2.13 2.10.0 org.scala-lang scala-library 2.13.12 - Dependency tree
Init Azure Storage API
import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
val storageAccount = CloudStorageAccount.parse(storageConnectionString)
val queueClient = storageAccount.createCloudQueueClient
queueClient.getQueueReference("myQueue")
}
For more details, see Microsoft Azure Storage Docs.
Queuing a message
import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._
// Create an example message
val message = new CloudQueueMessage("Hello Azure")
Source.single(message).runWith(AzureQueueSink(queueFactory))
Processing and deleting messages
AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
println(msg.getMessageContentAsString) // Print the messages content
msg // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))