Azure Storage Queue

The Azure Storage Queue connector provides an Akka Stream Source and Sinks for Azure Storage Queue integration.

Azure Storage Queue is a queuing service similar to Amazon’s SQS. It is designed mostly for long-running and non-time-critical tasks. For more information on Azure Storage Queue see the Azure docs.

Project Info: Alpakka Azure Storage Queue
Artifact
com.lightbend.akka
akka-stream-alpakka-azure-storage-queue
7.0.2
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.1
JPMS module nameakka.stream.alpakka.azure.storagequeue
License
Readiness level
Since 0.9, 2017-05-24
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.9.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage-queue" % "7.0.2",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.9.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-azure-storage-queue_${scala.binary.version}</artifactId>
    <version>7.0.2</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.9.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage-queue_${versions.ScalaBinary}:7.0.2"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.microsoft.azureazure-storage8.0.0
com.typesafe.akkaakka-stream_2.132.9.0
org.scala-langscala-library2.13.12
Dependency tree
com.microsoft.azure    azure-storage    8.0.0    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-core    2.9.4
    com.microsoft.azure    azure-keyvault-core    1.0.0    The MIT License (MIT)
        com.google.guava    guava    20.0
        org.apache.commons    commons-lang3    3.4
    org.apache.commons    commons-lang3    3.4
    org.slf4j    slf4j-api    1.7.12
com.typesafe.akka    akka-stream_2.13    2.9.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.9.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang.modules    scala-java8-compat_2.13    1.0.2    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.9.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Init Azure Storage API

import com.microsoft.azure.storage._
import com.microsoft.azure.storage.queue._
val storageConnectionString = "DefaultEndpointsProtocol=http;AccountName=<YourAccountName>;AccountKey=<YourKey>"
val queueFactory = () => { // Since azure storage JDK is not guaranteed to be thread-safe
  val storageAccount = CloudStorageAccount.parse(storageConnectionString)
  val queueClient = storageAccount.createCloudQueueClient
  queueClient.getQueueReference("myQueue")
}

For more details, see Microsoft Azure Storage Docs.

Queuing a message

import one.aleph.akkzure.queue._
import one.aleph.akkzure.queue.scaladsl._

// Create an example message
val message = new CloudQueueMessage("Hello Azure")

Source.single(message).runWith(AzureQueueSink(queueFactory))

Processing and deleting messages

AzureQueueSource(queueFactory).take(10)
.map({ msg: CloudQueueMessage =>
  println(msg.getMessageContentAsString) // Print the messages content
  msg                                    // Return message to the flow for deletion
}).runWith(AzureQueueDeleteSink(queueFactory))
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.