Azure Storage

Azure Storage connector provides Akka Stream Source for Azure Storage. Currently only supports Blob and File services. For detail about these services please read Azure docs.

Project Info: Alpakka Azure Storage
Artifact
com.lightbend.akka
akka-stream-alpakka-azure-storage
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.3
JPMS module nameakka.stream.alpakka.azure.storage
License
Readiness level
Since 8.0.0, 2024-08-28
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
  "com.typesafe.akka" %% "akka-http-xml" % AkkaHttpVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <akka.http.version>10.7.0</akka.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-azure-storage_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http-xml_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  AkkaHttpVersion: "10.7.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
  implementation "com.typesafe.akka:akka-http-xml_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-http-xml_2.1310.7.0
com.typesafe.akkaakka-http_2.1310.7.0
com.typesafe.akkaakka-stream_2.132.10.0
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-http-xml_2.13    10.7.0    BUSL-1.1
    com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.slf4j    slf4j-api    2.0.16
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.scala-lang.modules    scala-xml_2.13    2.3.0    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
    com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
        com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.slf4j    slf4j-api    2.0.16
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Configuration

The settings for the Azure Storage connector are read by default from alpakka.azure-storage configuration section. Credentials are defined in credentials section of reference.conf.

Scala
sourcecredentials {
  # valid values are anon (annonymous), SharedKey, and sas
  authorization-type = anon
  authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE}

  # required for all authorization types
  account-name = ""
  account-name = ${?AZURE_STORAGE_ACCOUNT_NAME}

  # Account key is required for SharedKey or SharedKeyLite authorization
  account-key = none
  account-key = ${?AZURE_STORAGE_ACCOUNT_KEY}

  # SAS token for sas authorization
  sas-token = ""
  sas-token = ${?AZURE_STORAGE_SAS_TOKEN}
}
Java
sourcecredentials {
  # valid values are anon (annonymous), SharedKey, and sas
  authorization-type = anon
  authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE}

  # required for all authorization types
  account-name = ""
  account-name = ${?AZURE_STORAGE_ACCOUNT_NAME}

  # Account key is required for SharedKey or SharedKeyLite authorization
  account-key = none
  account-key = ${?AZURE_STORAGE_ACCOUNT_KEY}

  # SAS token for sas authorization
  sas-token = ""
  sas-token = ${?AZURE_STORAGE_SAS_TOKEN}
}

At minimum following configurations needs to be set:

  • authorization-type, this is the type of authorization to use as described here, possible values are anon, SharedKey, or sas. Environment variable AZURE_STORAGE_AUTHORIZATION_TYPE can be set to override this configuration.
  • account-name, this is the name of the blob storage or file share. Environment variable AZURE_STORAGE_ACCOUNT_NAME can be set to override this configuration.
  • account-key, Account key to use to create authorization signature, mandatory for SharedKey or SharedKeyLite authorization types, as described here. Environment variable AZURE_STORAGE_ACCOUNT_KEY can be set to override this configuration.
  • sas-token if authorization type is sas. Environment variable AZURE_STORAGE_SAS_TOKEN can be set to override this configuration.

Building request

Each function takes two parameters objectPath and requestBuilder. The objectPath is a / separated string of the path of the blob or file, for example, my-container/my-blob or my-share/my-directory/my-file.

Each request builder is subclass of RequestBuilder which knows how to construct request for the given operation.

Create simple request builder with default values

In this example GetBlob builder is initialized without any optional field.

Scala
sourceval requestBuilder = GetBlob()
Java
sourcefinal GetBlob requestBuilder = GetBlob.create();

Create request builder initialized with optional fields

In this example GetBlob builder is initialized with given leaseId and range fields.

Scala
sourceval requestBuilder = GetBlob().withLeaseId("my-lease-id").withRange(ByteRange(0, 25))
Java
sourcefinal var requestBuilder = GetBlob.create().withLeaseId("my-lease-id").withRange(ByteRange.createSlice(0, 25));

Create request builder initialized with required fields

In this example CreateFile builder is initialized with maxFileSize and contentType fields, which are required fields for CreateFile operation.

Scala
sourceval requestBuilder = CreateFile(256L, ContentTypes.`text/plain(UTF-8)`)
Java
sourcefinal var requestBuilder = CreateFile.create(256L, ContentTypes.TEXT_PLAIN_UTF8);

Create request builder with ServerSideEncryption

ServerSideEncryption can be initialized in similar fashion.

Scala
sourceval requestBuilder = PutBlockBlob(256L, ContentTypes.`text/plain(UTF-8)`)
  .withServerSideEncryption(ServerSideEncryption.customerKey("SGVsbG9Xb3JsZA=="))
Java
sourcefinal var requestBuilder = PutBlockBlob.create(256L, ContentTypes.TEXT_PLAIN_UTF8)
        .withServerSideEncryption(ServerSideEncryption.customerKey("SGVsbG9Xb3JsZA=="));

Create request builder with additional headers

Some operations allow you to add additional headers, for GetBlob you can specify If-Match header, which specify this header to perform the operation only if the resource’s ETag matches the value specified, this can be done by calling addHeader function.

Scala
sourceval requestBuilder = GetBlob().addHeader("If-Match", "foobar")
Java
sourcefinal var requestBuilder = GetBlob.create().addHeader("If-Match", "foobar");

Supported operations on Blob service

Create Container

The Create Container operation creates a new container under the specified account.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.CreateContainer

val source: Source[Option[ObjectMetadata], NotUsed] =
  BlobService.createContainer(containerName, CreateContainer())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.createContainer(containerName(), CreateContainer.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Delete Container

The Delete Container operation creates existing container under the specified account.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.DeleteContainer

val source: Source[Option[ObjectMetadata], NotUsed] =
  BlobService.deleteContainer(containerName, DeleteContainer())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.deleteContainer(containerName(), DeleteContainer.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Put Block Blob

The Put Block Blob operation creates a new block or updates the content of an existing block blob.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.PutBlockBlob

val source: Source[Option[ObjectMetadata], NotUsed] =
  BlobService.putBlockBlob(
    objectPath = s"$containerName/$blobName",
    payload = Source.single(ByteString(payload)),
    requestBuilder = PutBlockBlob(contentLength, ContentTypes.`text/plain(UTF-8)`)
  )

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        BlobService.putBlockBlob(containerName() + "/" + blobName(),
                PutBlockBlob.create(contentLength(), ContentTypes.TEXT_PLAIN_UTF8),
                Source.single(ByteString.fromString(payload())));

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Get Blob

The Get Blob operation reads or downloads a blob from the system, including its metadata and properties.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.GetBlob

val source: Source[ByteString, Future[ObjectMetadata]] =
  BlobService.getBlob(objectPath = s"$containerName/$blobName", GetBlob())

val eventualText = source.toMat(Sink.seq)(Keep.right).run()
Java
sourcefinal Source<ByteString, CompletionStage<ObjectMetadata>> source =
        BlobService.getBlob(containerName() + "/" + blobName(), GetBlob.create());

final CompletionStage<List<ByteString>> eventualPayload = source.runWith(Sink.seq(), system);

In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange as argument.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.GetBlob

val source: Source[ByteString, Future[ObjectMetadata]] =
  BlobService.getBlob(objectPath = s"$containerName/$blobName", requestBuilder = GetBlob().withRange(subRange))

val eventualText: Future[Seq[ByteString]] = source.toMat(Sink.seq)(Keep.right).run()
Java
sourcefinal Source<ByteString, CompletionStage<ObjectMetadata>> source =
        BlobService.getBlob(containerName() + "/" + blobName(), GetBlob.create().withRange(subRange()));

final CompletionStage<List<ByteString>> eventualPayload = source.runWith(Sink.seq(), system);

Get blob properties without downloading blob

The Get Blob Properties operation returns all user-defined metadata, standard HTTP properties, and system properties for the blob. (**Note:** Current implementation does not return user-defined metadata.)

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.GetProperties

val source: Source[Option[ObjectMetadata], NotUsed] =
  BlobService.getProperties(objectPath = s"$containerName/$blobName", GetProperties())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        BlobService.getProperties(containerName() + "/" + blobName(), GetProperties.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Delete Blob

The Delete Blob operation deletes the specified blob.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.BlobService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.DeleteBlob

val source: Source[Option[ObjectMetadata], NotUsed] =
  BlobService.deleteBlob(objectPath = s"$containerName/$blobName", DeleteBlob())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        BlobService.deleteBlob(containerName() + "/" + blobName(), DeleteFile.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Supported operations on File service

Create File

The Create File operation creates a new file or replaces a file.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.CreateFile

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.createFile(objectPath = s"$containerName/$blobName",
                         requestBuilder = CreateFile(contentLength, ContentTypes.`text/plain(UTF-8)`))

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        FileService.createFile(containerName() + "/" + blobName(),
                CreateFile.create(contentLength(), ContentTypes.TEXT_PLAIN_UTF8));

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Update Range

The Update Range operation writes a range of bytes to a file.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.UpdateFileRange

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.updateRange(
    objectPath = s"$containerName/$blobName",
    payload = Source.single(ByteString(payload)),
    requestBuilder = UpdateFileRange(contentRange, ContentTypes.`text/plain(UTF-8)`)
  )

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.toMat(Sink.head)(Keep.right).run()
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        FileService.updateRange(containerName() + "/" + blobName(),
                UpdateFileRange.create(contentRange(), ContentTypes.TEXT_PLAIN_UTF8),
                Source.single(ByteString.fromString(payload())));

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Range can be cleared using ClearRange function.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.ClearFileRange

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.clearRange(objectPath = s"$containerName/$blobName", requestBuilder = ClearFileRange(subRange))

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        FileService.clearRange(containerName() + "/" + blobName(), ClearFileRange.create(subRange()));

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Create Directory

The Create Directory operation creates a new container under the specified account.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.CreateDirectory

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.createDirectory(directoryPath = containerName, requestBuilder = CreateDirectory())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source = FileService.createDirectory(containerName(), CreateDirectory.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Delete Directory

The Delete Directory operation creates existing container under the specified account.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.DeleteDirectory

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.deleteDirectory(directoryPath = containerName, requestBuilder = DeleteDirectory())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source = FileService.deleteDirectory(containerName(), DeleteDirectory.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Get File

The Get File operation reads or downloads a file from the system, including its metadata and properties.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.GetFile

val source: Source[ByteString, Future[ObjectMetadata]] =
  FileService.getFile(objectPath = s"$containerName/$blobName", GetFile())

val eventualText: Future[Seq[ByteString]] = source.toMat(Sink.seq)(Keep.right).run()
Java
sourcefinal Source<ByteString, CompletionStage<ObjectMetadata>> source =
        FileService.getFile(containerName() + "/" + blobName(), GetFile.create());

final CompletionStage<List<ByteString>> eventualPayload = source.runWith(Sink.seq(), system);

Get file properties without downloading file

The Get File Properties operation returns all user-defined metadata, standard HTTP properties, and system properties for the file. (**Note:** Current implementation does not return user-defined metatdata.)

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.GetProperties

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.getProperties(objectPath = s"$containerName/$blobName", GetProperties())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.toMat(Sink.head)(Keep.right).run()
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        FileService.getProperties(containerName() + "/" + blobName(), GetProperties.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);

Delete Blob

The Delete File operation immediately removes the file from the storage account.

Scala
sourceimport akka.stream.alpakka.azure.storage.scaladsl.FileService
import akka.stream.alpakka.azure.storage.ObjectMetadata
import akka.stream.alpakka.azure.storage.requests.DeleteFile

val source: Source[Option[ObjectMetadata], NotUsed] =
  FileService.deleteFile(objectPath = s"$containerName/$blobName", DeleteFile())

val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
Java
sourcefinal Source<Optional<ObjectMetadata>, NotUsed> source =
        FileService.deleteFile(containerName() + "/" + blobName(), DeleteFile.create());

final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.