Azure Storage
Azure Storage connector provides Akka Stream Source for Azure Storage. Currently only supports Blob
and File
services. For detail about these services please read Azure docs .
[+] Show project info
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
sbt resolvers += "Akka library repository" . at ( "https://repo.akka.io/maven" )
Maven <project>
...
<repositories>
<repository>
<id> akka-repository </id>
<name> Akka library repository </name>
<url> https://repo.akka.io/maven </url>
</repository>
</repositories>
</project>
Gradle repositories {
mavenCentral ()
maven {
url "https://repo.akka.io/maven"
}
}
Additionally, add the dependencies as below.
sbt val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq (
"com.lightbend.akka" %% "akka-stream-alpakka-azure-storage" % "9.0.1" ,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion ,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion ,
"com.typesafe.akka" %% "akka-http-xml" % AkkaHttpVersion
)
Maven <properties>
<akka.version> 2.10.0 </akka.version>
<akka.http.version> 10.7.0 </akka.http.version>
<scala.binary.version> 2.13 </scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId> com.lightbend.akka </groupId>
<artifactId> akka-stream-alpakka-azure-storage_${scala.binary.version} </artifactId>
<version> 9.0.1 </version>
</dependency>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-stream_${scala.binary.version} </artifactId>
<version> ${akka.version} </version>
</dependency>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-http_${scala.binary.version} </artifactId>
<version> ${akka.http.version} </version>
</dependency>
<dependency>
<groupId> com.typesafe.akka </groupId>
<artifactId> akka-http-xml_${scala.binary.version} </artifactId>
<version> ${akka.http.version} </version>
</dependency>
</dependencies>
Gradle def versions = [
AkkaVersion : "2.10.0" ,
AkkaHttpVersion : "10.7.0" ,
ScalaBinary : "2.13"
]
dependencies {
implementation "com.lightbend.akka:akka-stream-alpakka-azure-storage_${versions.ScalaBinary}:9.0.1"
implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
implementation "com.typesafe.akka:akka-http-xml_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
Direct dependencies
Organization Artifact Version
com.typesafe.akka akka-http-xml_2.13 10.7.0
com.typesafe.akka akka-http_2.13 10.7.0
com.typesafe.akka akka-stream_2.13 2.10.0
org.scala-lang scala-library 2.13.12
Dependency tree com.typesafe.akka akka-http-xml_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang.modules scala-xml_2.13 2.3.0 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1
com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1
com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.slf4j slf4j-api 2.0.16
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1
com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1
com.typesafe config 1.4.3 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1
org.reactivestreams reactive-streams 1.0.4 MIT-0
org.scala-lang scala-library 2.13.12 Apache-2.0
org.scala-lang scala-library 2.13.12 Apache-2.0
Configuration
The settings for the Azure Storage connector are read by default from alpakka.azure-storage
configuration section. Credentials are defined in credentials
section of reference.conf
.
Scala
copy source credentials {
# valid values are anon (annonymous), SharedKey, and sas
authorization - type = anon
authorization - type = $ {? AZURE_STORAGE_AUTHORIZATION_TYPE }
# required for all authorization types
account - name = ""
account - name = $ {? AZURE_STORAGE_ACCOUNT_NAME }
# Account key is required for SharedKey or SharedKeyLite authorization
account - key = none
account - key = $ {? AZURE_STORAGE_ACCOUNT_KEY }
# SAS token for sas authorization
sas - token = ""
sas - token = $ {? AZURE_STORAGE_SAS_TOKEN }
}
Java
copy source credentials {
# valid values are anon (annonymous), SharedKey, and sas
authorization - type = anon
authorization - type = $ {? AZURE_STORAGE_AUTHORIZATION_TYPE }
# required for all authorization types
account - name = ""
account - name = $ {? AZURE_STORAGE_ACCOUNT_NAME }
# Account key is required for SharedKey or SharedKeyLite authorization
account - key = none
account - key = $ {? AZURE_STORAGE_ACCOUNT_KEY }
# SAS token for sas authorization
sas - token = ""
sas - token = $ {? AZURE_STORAGE_SAS_TOKEN }
}
At minimum following configurations needs to be set:
authorization-type
, this is the type of authorization to use as described here , possible values are anon
, SharedKey
, or sas
. Environment variable AZURE_STORAGE_AUTHORIZATION_TYPE
can be set to override this configuration.
account-name
, this is the name of the blob storage or file share. Environment variable AZURE_STORAGE_ACCOUNT_NAME
can be set to override this configuration.
account-key
, Account key to use to create authorization signature, mandatory for SharedKey
or SharedKeyLite
authorization types, as described here . Environment variable AZURE_STORAGE_ACCOUNT_KEY
can be set to override this configuration.
sas-token
if authorization type is sas
. Environment variable AZURE_STORAGE_SAS_TOKEN
can be set to override this configuration.
Building request
Each function takes two parameters objectPath
and requestBuilder
. The objectPath
is a /
separated string of the path of the blob or file, for example, my-container/my-blob
or my-share/my-directory/my-file
.
Each request builder is subclass of RequestBuilder
which knows how to construct request for the given operation.
Create simple request builder with default values
In this example GetBlob
builder is initialized without any optional field.
Scala
copy source val requestBuilder = GetBlob ()
Java
copy source final GetBlob requestBuilder = GetBlob . create ();
Create request builder initialized with optional fields
In this example GetBlob
builder is initialized with given leaseId
and range
fields.
Scala
copy source val requestBuilder = GetBlob (). withLeaseId ( "my-lease-id" ). withRange ( ByteRange ( 0 , 25 ))
Java
copy source final var requestBuilder = GetBlob . create (). withLeaseId ( "my-lease-id" ). withRange ( ByteRange . createSlice ( 0 , 25 ));
Create request builder initialized with required fields
In this example CreateFile
builder is initialized with maxFileSize
and contentType
fields, which are required fields for CreateFile
operation.
Scala
copy source val requestBuilder = CreateFile ( 256L , ContentTypes . `text/plain(UTF-8)` )
Java
copy source final var requestBuilder = CreateFile . create ( 256L , ContentTypes . TEXT_PLAIN_UTF8 );
Create request builder with ServerSideEncryption
ServerSideEncryption
can be initialized in similar fashion.
Scala
copy source val requestBuilder = PutBlockBlob ( 256L , ContentTypes . `text/plain(UTF-8)` )
. withServerSideEncryption ( ServerSideEncryption . customerKey ( "SGVsbG9Xb3JsZA==" ))
Java
copy source final var requestBuilder = PutBlockBlob . create ( 256L , ContentTypes . TEXT_PLAIN_UTF8 )
. withServerSideEncryption ( ServerSideEncryption . customerKey ( "SGVsbG9Xb3JsZA==" ));
Some operations allow you to add additional headers, for GetBlob
you can specify If-Match
header, which specify this header to perform the operation only if the resource’s ETag matches the value specified, this can be done by calling addHeader
function.
Scala
copy source val requestBuilder = GetBlob (). addHeader ( "If-Match" , "foobar" )
Java
copy source final var requestBuilder = GetBlob . create (). addHeader ( "If-Match" , "foobar" );
Supported operations on Blob service
Create Container
The Create Container
operation creates a new container under the specified account.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . CreateContainer
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
BlobService . createContainer ( containerName , CreateContainer ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source = BlobService . createContainer ( containerName (), CreateContainer . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Delete Container
The Delete Container
operation creates existing container under the specified account.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . DeleteContainer
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
BlobService . deleteContainer ( containerName , DeleteContainer ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source = BlobService . deleteContainer ( containerName (), DeleteContainer . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Put Block Blob
The Put Block Blob
operation creates a new block or updates the content of an existing block blob.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . PutBlockBlob
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
BlobService . putBlockBlob (
objectPath = s "$containerName/$blobName" ,
payload = Source . single ( ByteString ( payload )),
requestBuilder = PutBlockBlob ( contentLength , ContentTypes . `text/plain(UTF-8)` )
)
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
BlobService . putBlockBlob ( containerName () + "/" + blobName (),
PutBlockBlob . create ( contentLength (), ContentTypes . TEXT_PLAIN_UTF8 ),
Source . single ( ByteString . fromString ( payload ())));
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Get Blob
The Get Blob
operation reads or downloads a blob from the system, including its metadata and properties.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . GetBlob
val source : Source [ ByteString , Future [ ObjectMetadata ]] =
BlobService . getBlob ( objectPath = s "$containerName/$blobName" , GetBlob ())
val eventualText = source . toMat ( Sink . seq )( Keep . right ). run ()
Java
copy source final Source < ByteString , CompletionStage < ObjectMetadata >> source =
BlobService . getBlob ( containerName () + "/" + blobName (), GetBlob . create ());
final CompletionStage < List < ByteString >> eventualPayload = source . runWith ( Sink . seq (), system );
In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange
as argument.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . GetBlob
val source : Source [ ByteString , Future [ ObjectMetadata ]] =
BlobService . getBlob ( objectPath = s "$containerName/$blobName" , requestBuilder = GetBlob (). withRange ( subRange ))
val eventualText : Future [ Seq [ ByteString ]] = source . toMat ( Sink . seq )( Keep . right ). run ()
Java
copy source final Source < ByteString , CompletionStage < ObjectMetadata >> source =
BlobService . getBlob ( containerName () + "/" + blobName (), GetBlob . create (). withRange ( subRange ()));
final CompletionStage < List < ByteString >> eventualPayload = source . runWith ( Sink . seq (), system );
Get blob properties without downloading blob
The Get Blob Properties
operation returns all user-defined metadata, standard HTTP properties, and system properties for the blob. (**Note:** Current implementation does not return user-defined metadata.)
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . GetProperties
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
BlobService . getProperties ( objectPath = s "$containerName/$blobName" , GetProperties ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
BlobService . getProperties ( containerName () + "/" + blobName (), GetProperties . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Delete Blob
The Delete Blob
operation deletes the specified blob.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . BlobService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . DeleteBlob
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
BlobService . deleteBlob ( objectPath = s "$containerName/$blobName" , DeleteBlob ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
BlobService . deleteBlob ( containerName () + "/" + blobName (), DeleteFile . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Supported operations on File service
Create File
The Create File
operation creates a new file or replaces a file.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . CreateFile
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . createFile ( objectPath = s "$containerName/$blobName" ,
requestBuilder = CreateFile ( contentLength , ContentTypes . `text/plain(UTF-8)` ))
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
FileService . createFile ( containerName () + "/" + blobName (),
CreateFile . create ( contentLength (), ContentTypes . TEXT_PLAIN_UTF8 ));
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Update Range
The Update Range
operation writes a range of bytes to a file.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . UpdateFileRange
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . updateRange (
objectPath = s "$containerName/$blobName" ,
payload = Source . single ( ByteString ( payload )),
requestBuilder = UpdateFileRange ( contentRange , ContentTypes . `text/plain(UTF-8)` )
)
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . toMat ( Sink . head )( Keep . right ). run ()
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
FileService . updateRange ( containerName () + "/" + blobName (),
UpdateFileRange . create ( contentRange (), ContentTypes . TEXT_PLAIN_UTF8 ),
Source . single ( ByteString . fromString ( payload ())));
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Range can be cleared using ClearRange
function.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . ClearFileRange
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . clearRange ( objectPath = s "$containerName/$blobName" , requestBuilder = ClearFileRange ( subRange ))
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
FileService . clearRange ( containerName () + "/" + blobName (), ClearFileRange . create ( subRange ()));
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Create Directory
The Create Directory
operation creates a new container under the specified account.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . CreateDirectory
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . createDirectory ( directoryPath = containerName , requestBuilder = CreateDirectory ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source = FileService . createDirectory ( containerName (), CreateDirectory . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Delete Directory
The Delete Directory
operation creates existing container under the specified account.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . DeleteDirectory
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . deleteDirectory ( directoryPath = containerName , requestBuilder = DeleteDirectory ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source = FileService . deleteDirectory ( containerName (), DeleteDirectory . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Get File
The Get File
operation reads or downloads a file from the system, including its metadata and properties.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . GetFile
val source : Source [ ByteString , Future [ ObjectMetadata ]] =
FileService . getFile ( objectPath = s "$containerName/$blobName" , GetFile ())
val eventualText : Future [ Seq [ ByteString ]] = source . toMat ( Sink . seq )( Keep . right ). run ()
Java
copy source final Source < ByteString , CompletionStage < ObjectMetadata >> source =
FileService . getFile ( containerName () + "/" + blobName (), GetFile . create ());
final CompletionStage < List < ByteString >> eventualPayload = source . runWith ( Sink . seq (), system );
Get file properties without downloading file
The Get File Properties
operation returns all user-defined metadata, standard HTTP properties, and system properties for the file. (**Note:** Current implementation does not return user-defined metatdata.)
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . GetProperties
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . getProperties ( objectPath = s "$containerName/$blobName" , GetProperties ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . toMat ( Sink . head )( Keep . right ). run ()
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
FileService . getProperties ( containerName () + "/" + blobName (), GetProperties . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );
Delete Blob
The Delete File
operation immediately removes the file from the storage account.
Scala
copy source import akka . stream . alpakka . azure . storage . scaladsl . FileService
import akka . stream . alpakka . azure . storage . ObjectMetadata
import akka . stream . alpakka . azure . storage . requests . DeleteFile
val source : Source [ Option [ ObjectMetadata ], NotUsed ] =
FileService . deleteFile ( objectPath = s "$containerName/$blobName" , DeleteFile ())
val eventualMaybeMetadata : Future [ Option [ ObjectMetadata ]] = source . runWith ( Sink . head )
Java
copy source final Source < Optional < ObjectMetadata >, NotUsed > source =
FileService . deleteFile ( containerName () + "/" + blobName (), DeleteFile . create ());
final CompletionStage < Optional < ObjectMetadata >> optionalCompletionStage = source . runWith ( Sink . head (), system );