New to Akka? Start with the Akka SDK.

Azure Storage Queue

The Azure Storage Queue connector provides an Akka Stream Source and Sinks for Azure Storage Queue integration.

Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.

Project Info: Alpakka Azure Storage Queue
Artifact
com.lightbend.akka
akka-stream-alpakka-azure-storage-queue
10.0.1
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.17, 3.3.7
JPMS module nameakka.stream.alpakka.azure.storagequeue
License
Readiness level
Since 0.9, 2017-05-24
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

Note

The Akka dependencies are available from Akka’s secure library repository. To access them you need to use a secure, tokenized URL as specified at https://account.akka.io/token.

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.11"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage-queue" % "10.0.1",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.11</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-azure-storage-queue_${scala.binary.version}</artifactId>
    <version>10.0.1</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.11",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage-queue_${versions.ScalaBinary}:10.0.1"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.microsoft.azureazure-storage8.6.6
com.typesafe.akkaakka-stream_2.132.10.11
org.scala-langscala-library2.13.17
Dependency tree
com.microsoft.azure    azure-storage    8.6.6    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.9.4    The Apache Software License, Version 2.0
    com.microsoft.azure    azure-keyvault-core    1.2.4    The MIT License (MIT)
        com.google.guava    guava    24.1.1-jre    The Apache Software License, Version 2.0
            com.google.code.findbugs    jsr305    1.3.9    The Apache Software License, Version 2.0
            com.google.errorprone    error_prone_annotations    2.1.3    Apache 2.0
            com.google.j2objc    j2objc-annotations    1.1    The Apache Software License, Version 2.0
            org.checkerframework    checker-compat-qual    2.0.0    GNU General Public License, version 2 (GPL2), with the classpath exception
            org.codehaus.mojo    animal-sniffer-annotations    1.14    MIT license
        org.apache.commons    commons-lang3    3.8.1    Apache License, Version 2.0
    org.apache.commons    commons-lang3    3.8.1    Apache License, Version 2.0
    org.slf4j    slf4j-api    1.7.12    MIT License
com.typesafe.akka    akka-stream_2.13    2.10.11    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.11    BUSL-1.1
        com.typesafe    config    1.4.5    Apache-2.0
        org.scala-lang    scala-library    2.13.17    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.11    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.17    Apache-2.0
org.scala-lang    scala-library    2.13.17    Apache-2.0

Init Azure Storage API

import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
  val storageAccount = CloudStorageAccount.parse(storageConnectionString)
  val queueClient = storageAccount.createCloudQueueClient
  queueClient.getQueueReference("myQueue")
}

For more details, see Microsoft Azure Storage Docs.

Queuing a message

import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._

// Create an example message
val message = new CloudQueueMessage("Hello Azure")

Source.single(message).runWith(AzureQueueSink(queueFactory))

Processing and deleting messages

AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
  println(msg.getMessageContentAsString) // Print the messages content
  msg                                    // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.