AWS S3
The AWS S3 connector provides Akka Stream sources and sinks to connect to Amazon S3. S3 stands for Simple Storage Service and is an object storage service with a web service interface.
Project Info: Alpakka Amazon S3 | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-s3
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.11.12, 2.13.3 |
JPMS module name | akka.stream.alpakka.aws.s3 |
License | |
Readiness level |
Since 0.2, 2016-10-28
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.5.31" val AkkaHttpVersion = "10.1.11" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "2.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-xml" % AkkaHttpVersion )
- Maven
<properties> <akka.version>2.5.31</akka.version> <akka.http.version>10.1.11</akka.http.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-s3_${scala.binary.version}</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http-xml_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency>
- Gradle
versions += [ AkkaVersion: "2.5.31", AkkaHttpVersion: "10.1.11", ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-s3_${versions.ScalaBinary}", version: '2.0.2', compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion, compile group: 'com.typesafe.akka', name: "akka-http_${versions.ScalaBinary}", version: versions.AkkaHttpVersion, compile group: 'com.typesafe.akka', name: "akka-http-xml_${versions.ScalaBinary}", version: versions.AkkaHttpVersion }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.fasterxml.jackson.core jackson-core 2.10.5 com.fasterxml.jackson.core jackson-databind 2.10.5 com.typesafe.akka akka-http-xml_2.12 10.1.11 com.typesafe.akka akka-http_2.12 10.1.11 com.typesafe.akka akka-stream_2.12 2.5.31 org.scala-lang scala-library 2.12.11 software.amazon.awssdk auth 2.11.14 - Dependency tree
com.fasterxml.jackson.core jackson-core 2.10.5 com.fasterxml.jackson.core jackson-databind 2.10.5 com.fasterxml.jackson.core jackson-annotations 2.10.5 com.fasterxml.jackson.core jackson-core 2.10.5 com.typesafe.akka akka-http-xml_2.12 10.1.11 com.typesafe.akka akka-http_2.12 10.1.11 com.typesafe.akka akka-http-core_2.12 10.1.11 com.typesafe.akka akka-parsing_2.12 10.1.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.scala-lang.modules scala-xml_2.12 1.2.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-http_2.12 10.1.11 com.typesafe.akka akka-http-core_2.12 10.1.11 com.typesafe.akka akka-parsing_2.12 10.1.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-stream_2.12 2.5.31 com.typesafe.akka akka-actor_2.12 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-protobuf_2.12 2.5.31 org.scala-lang scala-library 2.12.11 com.typesafe ssl-config-core_2.12 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 software.amazon.awssdk auth 2.11.14 com.fasterxml.jackson.core jackson-databind 2.10.5 com.fasterxml.jackson.core jackson-annotations 2.10.5 com.fasterxml.jackson.core jackson-core 2.10.5 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk http-client-spi 2.11.14 org.reactivestreams reactive-streams 1.0.2 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk profiles 2.11.14 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk regions 2.11.14 com.fasterxml.jackson.core jackson-annotations 2.10.5 com.fasterxml.jackson.core jackson-databind 2.10.5 com.fasterxml.jackson.core jackson-annotations 2.10.5 com.fasterxml.jackson.core jackson-core 2.10.5 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk profiles 2.11.14 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk sdk-core 2.11.14 com.fasterxml.jackson.core jackson-core 2.10.5 com.fasterxml.jackson.core jackson-databind 2.10.5 com.fasterxml.jackson.core jackson-annotations 2.10.5 com.fasterxml.jackson.core jackson-core 2.10.5 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk http-client-spi 2.11.14 org.reactivestreams reactive-streams 1.0.2 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk profiles 2.11.14 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk sdk-core 2.11.14 com.fasterxml.jackson.core jackson-core 2.10.5 com.fasterxml.jackson.core jackson-databind 2.10.5 com.fasterxml.jackson.core jackson-annotations 2.10.5 com.fasterxml.jackson.core jackson-core 2.10.5 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk http-client-spi 2.11.14 org.reactivestreams reactive-streams 1.0.2 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk profiles 2.11.14 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.awssdk utils 2.11.14 org.reactivestreams reactive-streams 1.0.2 org.slf4j slf4j-api 1.7.28 software.amazon.awssdk annotations 2.11.14 software.amazon.eventstream eventstream 1.0.1
Configuration
The settings for the S3 connector are read by default from alpakka.s3
configuration section. Credentials are loaded as described in the DefaultCredentialsProvider
documentation. Therefore, if you are using Alpakka S3 connector in a standard environment, no configuration changes should be necessary. However, if you use a non-standard configuration path or need multiple different configurations, please refer to the attributes section below to see how to apply different configuration to different parts of the stream. All of the available configuration settings can be found in the reference.conf.
Store a file in S3
A file can be uploaded to S3 by creating a source of ByteString
ByteString
and running that with a sink created from S3.multipartUpload
S3.multipartUpload
.
- Scala
-
val file: Source[ByteString, NotUsed] = Source.single(ByteString(body)) val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload(bucket, bucketKey) val result: Future[MultipartUploadResult] = file.runWith(s3Sink)
- Java
-
final Source<ByteString, NotUsed> file = Source.single(ByteString.fromString(body())); final Sink<ByteString, CompletionStage<MultipartUploadResult>> sink = S3.multipartUpload(bucket(), bucketKey()); final CompletionStage<MultipartUploadResult> resultCompletionStage = file.runWith(sink, materializer);
Download a file from S3
A source for downloading a file can be created by calling S3.download
S3.download
. It will emit an Option
Optional
that will hold file’s data and metadata or will be empty if no such file can be found.
- Scala
-
val s3File: Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed] = S3.download(bucket, bucketKey) val Some((data: Source[ByteString, _], metadata)) = s3File.runWith(Sink.head).futureValue val result: Future[String] = data.map(_.utf8String).runWith(Sink.head)
- Java
-
final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed> sourceAndMeta = S3.download(bucket(), bucketKey()); final Pair<Source<ByteString, NotUsed>, ObjectMetadata> dataAndMetadata = sourceAndMeta .runWith(Sink.head(), materializer) .toCompletableFuture() .get(5, TimeUnit.SECONDS) .get(); final Source<ByteString, NotUsed> data = dataAndMetadata.first(); final ObjectMetadata metadata = dataAndMetadata.second(); final CompletionStage<String> resultCompletionStage = data.map(ByteString::utf8String).runWith(Sink.head(), materializer); String result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);
In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange
as argument.
- Scala
-
val downloadResult = S3.download(bucket, bucketKey, Some(ByteRange(bytesRangeStart, bytesRangeEnd)))
- Java
-
final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed> sourceAndMeta = S3.download( bucket(), bucketKey(), ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()));
File metadata (ObjectMetadata
ObjectMetadata
) holds content type, size and other useful information about the object. Here’s an example of using this metadata to stream an object back to a client in Akka Http.
- Scala
-
HttpResponse( entity = HttpEntity( metadata.contentType .flatMap(ContentType.parse(_).right.toOption) .getOrElse(ContentTypes.`application/octet-stream`), metadata.contentLength, data ) )
- Java
-
HttpResponse.create() .withEntity( HttpEntities.create( metadata .getContentType() .map(ct -> ContentTypes.parse(ct)) .orElse(ContentTypes.APPLICATION_OCTET_STREAM), metadata.getContentLength(), data));
Access object metadata without downloading object from S3
If you do not need object itself, you can query for only object metadata using a source from S3.getObjectMetadata
S3.getObjectMetadata
.
- Scala
-
val metadata: Source[Option[ObjectMetadata], NotUsed] = S3.getObjectMetadata(bucket, bucketKey)
- Java
-
final Source<Optional<ObjectMetadata>, NotUsed> source = S3.getObjectMetadata(bucket(), bucketKey());
List bucket contents
To get a list of all objects in a bucket, use S3.listBucket
S3.listBucket
. When run, this will give a stream of ListBucketResultContents
.
- Scala
-
val keySource: Source[ListBucketResultContents, NotUsed] = S3.listBucket(bucket, Some(listPrefix))
- Java
-
final Source<ListBucketResultContents, NotUsed> keySource = S3.listBucket(bucket(), Optional.of(prefix));
List bucket contents and common prefixes
To get a list of the contents and common prefixes for one hierarchy level using a delimiter, use S3.listBucketAndCommonPrefixes
S3.listBucketAndCommonPrefixes
. When run, this will give a tuple stream of (Seq[ListBucketResultContents
ListBucketResultContents
], Seq[ListBucketResultCommonPrefixes
ListBucketResultCommonPrefixes
]).
- Scala
-
val keyAndCommonPrefixSource : Source[(Seq[ListBucketResultContents], Seq[ListBucketResultCommonPrefixes]), NotUsed] = S3.listBucketAndCommonPrefixes(bucket, listDelimiter, Some(listPrefix))
- Java
-
final Source< Pair<List<ListBucketResultContents>, List<ListBucketResultCommonPrefixes>>, NotUsed> keySource = S3.listBucketAndCommonPrefixes( bucket(), delimiter, Optional.of(prefix), S3Headers.empty());
Copy upload (multi part)
Copy an S3 object from source bucket to target bucket using S3.multipartCopy
S3.multipartCopy
. When run, this will emit a single MultipartUploadResult
with the information about the copied object.
- Scala
-
val result: Future[MultipartUploadResult] = S3.multipartCopy(bucket, bucketKey, targetBucket, targetBucketKey).run()
- Java
-
final CompletionStage<MultipartUploadResult> resultCompletionStage = S3.multipartCopy(bucket, sourceKey, targetBucket, targetKey).run(materializer);
If your bucket has versioning enabled, you could have multiple versions of the same object. By default AWS identifies the current version of the object to copy. You can optionally specify a specific version of the source object to copy by adding the sourceVersionId
parameter.
- Scala
-
val result: Future[MultipartUploadResult] = S3.multipartCopy(bucket, bucketKey, targetBucket, targetBucketKey, sourceVersionId = Some("3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo")) .run()
- Java
-
String sourceVersionId = "3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo"; final CompletionStage<MultipartUploadResult> resultCompletionStage = S3.multipartCopy( bucket, sourceKey, targetBucket, targetKey, Optional.of(sourceVersionId), S3Headers.create()) .run(materializer);
Different options are available for server side encryption in the ServerSideEncryption
factory.
- Scala
-
val keys = ServerSideEncryption .customerKeys(sseCustomerKey) .withMd5(sseCustomerMd5Key) val result: Future[MultipartUploadResult] = S3.multipartCopy(bucket, bucketKey, targetBucket, targetBucketKey, s3Headers = S3Headers().withServerSideEncryption(keys)) .run()
- Java
-
final CustomerKeys keys = ServerSideEncryption.customerKeys(sseCustomerKey()).withMd5(sseCustomerMd5Key()); final CompletionStage<MultipartUploadResult> resultCompletionStage = S3.multipartCopy( bucket(), bucketKey(), targetBucket(), targetBucketKey(), S3Headers.create().withServerSideEncryption(keys)) .run(materializer);
More S3 specific headers and arbitrary HTTP headers can be specified by adding to the S3Headers
container.
Apply S3 settings to a part of the stream
It is possible to make one part of the stream use different S3Settings
S3Settings
from the rest of the graph. This can be useful, when one stream is used to copy files across regions or even different S3 compatible endpoints. You can attach a custom S3Settings
instance or a custom config path to a graph using attributes from S3Attributes
S3Attributes
:
- Scala
-
val useVersion1Api = S3Ext(system).settings .withListBucketApiVersion(ApiVersion.ListBucketVersion1) val keySource: Source[ListBucketResultContents, NotUsed] = S3.listBucket(bucket, Some(listPrefix)) .withAttributes(S3Attributes.settings(useVersion1Api))
- Java
-
final S3Settings useVersion1Api = S3Ext.get(system()).settings().withListBucketApiVersion(ApiVersion.getListBucketVersion1()); final Source<ListBucketResultContents, NotUsed> keySource = S3.listBucket(bucket(), Optional.of(prefix)) .withAttributes(S3Attributes.settings(useVersion1Api));
Bucket management
Bucket management API provides functionality for both Sources and Futures / CompletionStages. In case of the Future API user can specify attributes to the request in the method itself and as for Sources it can be done via method .withAttributes
. For more information about attributes see: S3Attributes
S3Attributes
and Attributes
Attributes
Make bucket
In order to create a bucket in S3 you need to specify it’s unique name. This value has to be set accordingly to the requirements. The bucket will be created in the region specified in the settings.
- Scala
-
val bucketName = "samplebucket1" implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings) val makeBucketRequest: Future[Done] = S3.makeBucket(bucketName) val makeBucketSourceRequest: Source[Done, NotUsed] = S3.makeBucketSource(bucketName)
- Java
-
final Attributes sampleAttributes = S3Attributes.settings(sampleSettings); final String bucketName = "samplebucket1"; CompletionStage<Done> makeBucketRequest = S3.makeBucket(bucketName, materializer); CompletionStage<Done> makeBucketRequestWithAttributes = S3.makeBucket(bucketName, materializer, sampleAttributes); Source<Done, NotUsed> makeBucketSourceRequest = S3.makeBucketSource(bucketName);
Delete bucket
To delete a bucket you need to specify its name and the bucket needs to be empty.
- Scala
-
implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings) val deleteBucketRequest: Future[Done] = S3.deleteBucket(bucketName) val deleteBucketSourceRequest: Source[Done, NotUsed] = S3.deleteBucketSource(bucketName)
- Java
-
final Attributes sampleAttributes = S3Attributes.settings(sampleSettings); CompletionStage<Done> deleteBucketRequest = S3.deleteBucket(bucketName, materializer); CompletionStage<Done> deleteBucketRequestWithAttribues = S3.deleteBucket(bucketName, materializer, sampleAttributes); Source<Done, NotUsed> deleteBucketSourceRequest = S3.deleteBucketSource(bucketName);
Check if bucket exists
It is possible to check if a bucket exists and the user has rights to perform a listBucket
operation.
There are 3 possible outcomes:
- The user has access to the existing bucket, then it will return
AccessGranted
- The user doesn’t have access but the bucket exists so
AccessDenied
will be returned - The bucket doesn’t exist, the method will return
NotExists
- Scala
-
implicit val sampleAttributes: Attributes = S3Attributes.settings(sampleSettings) val doesntExistRequest: Future[BucketAccess] = S3.checkIfBucketExists(bucket) val doesntExistSourceRequest: Source[BucketAccess, NotUsed] = S3.checkIfBucketExistsSource(bucket)
- Java
-
final Attributes sampleAttributes = S3Attributes.settings(sampleSettings); final CompletionStage<BucketAccess> doesntExistRequest = S3.checkIfBucketExists(bucket(), materializer); final CompletionStage<BucketAccess> doesntExistRequestWithAttributes = S3.checkIfBucketExists(bucket(), materializer, sampleAttributes); final Source<BucketAccess, NotUsed> doesntExistSourceRequest = S3.checkIfBucketExistsSource(bucket());
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > s3/test
- Java
-
sbt > s3/test
Some test code requires Minio running in the background. You can start one quickly using docker:
docker-compose up minio_prep