Azure Storage
Azure Storage connector provides Akka Stream Source for Azure Storage. Currently only supports Blob
and File
services. For detail about these services please read Azure docs.
Project Info: Alpakka Azure Storage | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-azure-storage
9.0.0
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.3 |
JPMS module name | akka.stream.alpakka.azure.storage |
License | |
Readiness level |
Since 8.0.0, 2024-08-28
|
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" val AkkaHttpVersion = "10.7.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-azure-storage" % "9.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-xml" % AkkaHttpVersion )
- Maven
<properties> <akka.version>2.10.0</akka.version> <akka.http.version>10.7.0</akka.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-azure-storage_${scala.binary.version}</artifactId> <version>9.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http-xml_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.10.0", AkkaHttpVersion: "10.7.0", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage_${versions.ScalaBinary}:9.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}" implementation "com.typesafe.akka:akka-http-xml_${versions.ScalaBinary}:${versions.AkkaHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-http-xml_2.13 10.7.0 com.typesafe.akka akka-http_2.13 10.7.0 com.typesafe.akka akka-stream_2.13 2.10.0 org.scala-lang scala-library 2.13.12 - Dependency tree
com.typesafe.akka akka-http-xml_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang.modules scala-xml_2.13 2.3.0 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0
Configuration
The settings for the Azure Storage connector are read by default from alpakka.azure-storage
configuration section. Credentials are defined in credentials
section of reference.conf
.
- Scala
-
source
credentials { # valid values are anon (annonymous), SharedKey, and sas authorization-type = anon authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE} # required for all authorization types account-name = "" account-name = ${?AZURE_STORAGE_ACCOUNT_NAME} # Account key is required for SharedKey or SharedKeyLite authorization account-key = none account-key = ${?AZURE_STORAGE_ACCOUNT_KEY} # SAS token for sas authorization sas-token = "" sas-token = ${?AZURE_STORAGE_SAS_TOKEN} }
- Java
-
source
credentials { # valid values are anon (annonymous), SharedKey, and sas authorization-type = anon authorization-type = ${?AZURE_STORAGE_AUTHORIZATION_TYPE} # required for all authorization types account-name = "" account-name = ${?AZURE_STORAGE_ACCOUNT_NAME} # Account key is required for SharedKey or SharedKeyLite authorization account-key = none account-key = ${?AZURE_STORAGE_ACCOUNT_KEY} # SAS token for sas authorization sas-token = "" sas-token = ${?AZURE_STORAGE_SAS_TOKEN} }
At minimum following configurations needs to be set:
authorization-type
, this is the type of authorization to use as described here, possible values areanon
,SharedKey
, orsas
. Environment variableAZURE_STORAGE_AUTHORIZATION_TYPE
can be set to override this configuration.account-name
, this is the name of the blob storage or file share. Environment variableAZURE_STORAGE_ACCOUNT_NAME
can be set to override this configuration.account-key
, Account key to use to create authorization signature, mandatory forSharedKey
orSharedKeyLite
authorization types, as described here. Environment variableAZURE_STORAGE_ACCOUNT_KEY
can be set to override this configuration.sas-token
if authorization type issas
. Environment variableAZURE_STORAGE_SAS_TOKEN
can be set to override this configuration.
Building request
Each function takes two parameters objectPath
and requestBuilder
. The objectPath
is a /
separated string of the path of the blob or file, for example, my-container/my-blob
or my-share/my-directory/my-file
.
Each request builder is subclass of RequestBuilder
which knows how to construct request for the given operation.
Create simple request builder with default values
In this example GetBlob
builder is initialized without any optional field.
- Scala
-
source
val requestBuilder = GetBlob()
- Java
-
source
final GetBlob requestBuilder = GetBlob.create();
Create request builder initialized with optional fields
In this example GetBlob
builder is initialized with given leaseId
and range
fields.
- Scala
-
source
val requestBuilder = GetBlob().withLeaseId("my-lease-id").withRange(ByteRange(0, 25))
- Java
-
source
final var requestBuilder = GetBlob.create().withLeaseId("my-lease-id").withRange(ByteRange.createSlice(0, 25));
Create request builder initialized with required fields
In this example CreateFile
builder is initialized with maxFileSize
and contentType
fields, which are required fields for CreateFile
operation.
- Scala
-
source
val requestBuilder = CreateFile(256L, ContentTypes.`text/plain(UTF-8)`)
- Java
-
source
final var requestBuilder = CreateFile.create(256L, ContentTypes.TEXT_PLAIN_UTF8);
Create request builder with ServerSideEncryption
ServerSideEncryption
can be initialized in similar fashion.
- Scala
-
source
val requestBuilder = PutBlockBlob(256L, ContentTypes.`text/plain(UTF-8)`) .withServerSideEncryption(ServerSideEncryption.customerKey("SGVsbG9Xb3JsZA=="))
- Java
-
source
final var requestBuilder = PutBlockBlob.create(256L, ContentTypes.TEXT_PLAIN_UTF8) .withServerSideEncryption(ServerSideEncryption.customerKey("SGVsbG9Xb3JsZA=="));
Create request builder with additional headers
Some operations allow you to add additional headers, for GetBlob
you can specify If-Match
header, which specify this header to perform the operation only if the resource’s ETag matches the value specified, this can be done by calling addHeader
function.
- Scala
-
source
val requestBuilder = GetBlob().addHeader("If-Match", "foobar")
- Java
-
source
final var requestBuilder = GetBlob.create().addHeader("If-Match", "foobar");
Supported operations on Blob service
Create Container
The Create Container
operation creates a new container under the specified account.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.CreateContainer val source: Source[Option[ObjectMetadata], NotUsed] = BlobService.createContainer(containerName, CreateContainer()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.createContainer(containerName(), CreateContainer.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Delete Container
The Delete Container
operation creates existing container under the specified account.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.DeleteContainer val source: Source[Option[ObjectMetadata], NotUsed] = BlobService.deleteContainer(containerName, DeleteContainer()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.deleteContainer(containerName(), DeleteContainer.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Put Block Blob
The Put Block Blob
operation creates a new block or updates the content of an existing block blob.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.PutBlockBlob val source: Source[Option[ObjectMetadata], NotUsed] = BlobService.putBlockBlob( objectPath = s"$containerName/$blobName", payload = Source.single(ByteString(payload)), requestBuilder = PutBlockBlob(contentLength, ContentTypes.`text/plain(UTF-8)`) ) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.putBlockBlob(containerName() + "/" + blobName(), PutBlockBlob.create(contentLength(), ContentTypes.TEXT_PLAIN_UTF8), Source.single(ByteString.fromString(payload()))); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Get Blob
The Get Blob
operation reads or downloads a blob from the system, including its metadata and properties.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.GetBlob val source: Source[ByteString, Future[ObjectMetadata]] = BlobService.getBlob(objectPath = s"$containerName/$blobName", GetBlob()) val eventualText = source.toMat(Sink.seq)(Keep.right).run()
- Java
-
source
final Source<ByteString, CompletionStage<ObjectMetadata>> source = BlobService.getBlob(containerName() + "/" + blobName(), GetBlob.create()); final CompletionStage<List<ByteString>> eventualPayload = source.runWith(Sink.seq(), system);
In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange
as argument.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.GetBlob val source: Source[ByteString, Future[ObjectMetadata]] = BlobService.getBlob(objectPath = s"$containerName/$blobName", requestBuilder = GetBlob().withRange(subRange)) val eventualText: Future[Seq[ByteString]] = source.toMat(Sink.seq)(Keep.right).run()
- Java
-
source
final Source<ByteString, CompletionStage<ObjectMetadata>> source = BlobService.getBlob(containerName() + "/" + blobName(), GetBlob.create().withRange(subRange())); final CompletionStage<List<ByteString>> eventualPayload = source.runWith(Sink.seq(), system);
Get blob properties without downloading blob
The Get Blob Properties
operation returns all user-defined metadata, standard HTTP properties, and system properties for the blob. (**Note:** Current implementation does not return user-defined metadata.)
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.GetProperties val source: Source[Option[ObjectMetadata], NotUsed] = BlobService.getProperties(objectPath = s"$containerName/$blobName", GetProperties()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.getProperties(containerName() + "/" + blobName(), GetProperties.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Delete Blob
The Delete Blob
operation deletes the specified blob.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.BlobService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.DeleteBlob val source: Source[Option[ObjectMetadata], NotUsed] = BlobService.deleteBlob(objectPath = s"$containerName/$blobName", DeleteBlob()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = BlobService.deleteBlob(containerName() + "/" + blobName(), DeleteFile.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Supported operations on File service
Create File
The Create File
operation creates a new file or replaces a file.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.CreateFile val source: Source[Option[ObjectMetadata], NotUsed] = FileService.createFile(objectPath = s"$containerName/$blobName", requestBuilder = CreateFile(contentLength, ContentTypes.`text/plain(UTF-8)`)) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.createFile(containerName() + "/" + blobName(), CreateFile.create(contentLength(), ContentTypes.TEXT_PLAIN_UTF8)); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Update Range
The Update Range
operation writes a range of bytes to a file.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.UpdateFileRange val source: Source[Option[ObjectMetadata], NotUsed] = FileService.updateRange( objectPath = s"$containerName/$blobName", payload = Source.single(ByteString(payload)), requestBuilder = UpdateFileRange(contentRange, ContentTypes.`text/plain(UTF-8)`) ) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.toMat(Sink.head)(Keep.right).run()
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.updateRange(containerName() + "/" + blobName(), UpdateFileRange.create(contentRange(), ContentTypes.TEXT_PLAIN_UTF8), Source.single(ByteString.fromString(payload()))); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Range can be cleared using ClearRange
function.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.ClearFileRange val source: Source[Option[ObjectMetadata], NotUsed] = FileService.clearRange(objectPath = s"$containerName/$blobName", requestBuilder = ClearFileRange(subRange)) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.clearRange(containerName() + "/" + blobName(), ClearFileRange.create(subRange())); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Create Directory
The Create Directory
operation creates a new container under the specified account.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.CreateDirectory val source: Source[Option[ObjectMetadata], NotUsed] = FileService.createDirectory(directoryPath = containerName, requestBuilder = CreateDirectory()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.createDirectory(containerName(), CreateDirectory.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Delete Directory
The Delete Directory
operation creates existing container under the specified account.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.DeleteDirectory val source: Source[Option[ObjectMetadata], NotUsed] = FileService.deleteDirectory(directoryPath = containerName, requestBuilder = DeleteDirectory()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.deleteDirectory(containerName(), DeleteDirectory.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Get File
The Get File
operation reads or downloads a file from the system, including its metadata and properties.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.GetFile val source: Source[ByteString, Future[ObjectMetadata]] = FileService.getFile(objectPath = s"$containerName/$blobName", GetFile()) val eventualText: Future[Seq[ByteString]] = source.toMat(Sink.seq)(Keep.right).run()
- Java
-
source
final Source<ByteString, CompletionStage<ObjectMetadata>> source = FileService.getFile(containerName() + "/" + blobName(), GetFile.create()); final CompletionStage<List<ByteString>> eventualPayload = source.runWith(Sink.seq(), system);
Get file properties without downloading file
The Get File Properties
operation returns all user-defined metadata, standard HTTP properties, and system properties for the file. (**Note:** Current implementation does not return user-defined metatdata.)
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.GetProperties val source: Source[Option[ObjectMetadata], NotUsed] = FileService.getProperties(objectPath = s"$containerName/$blobName", GetProperties()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.toMat(Sink.head)(Keep.right).run()
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.getProperties(containerName() + "/" + blobName(), GetProperties.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);
Delete Blob
The Delete File
operation immediately removes the file from the storage account.
- Scala
-
source
import akka.stream.alpakka.azure.storage.scaladsl.FileService import akka.stream.alpakka.azure.storage.ObjectMetadata import akka.stream.alpakka.azure.storage.requests.DeleteFile val source: Source[Option[ObjectMetadata], NotUsed] = FileService.deleteFile(objectPath = s"$containerName/$blobName", DeleteFile()) val eventualMaybeMetadata: Future[Option[ObjectMetadata]] = source.runWith(Sink.head)
- Java
-
source
final Source<Optional<ObjectMetadata>, NotUsed> source = FileService.deleteFile(containerName() + "/" + blobName(), DeleteFile.create()); final CompletionStage<Optional<ObjectMetadata>> optionalCompletionStage = source.runWith(Sink.head(), system);