AWS S3

The AWS S3 connector provides Akka Stream sources and sinks to connect to Amazon S3. S3 stands for Simple Storage Service and is an object storage service with a web service interface.

Project Info: Alpakka Amazon S3
Artifact
com.lightbend.akka
akka-stream-alpakka-s3
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.aws.s3
License
Readiness level
Community-driven
Since 0.2, 2016-10-28
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-s3" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-s3_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-s3_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.amazonawsaws-java-sdk-core1.11.476Apache License, Version 2.0
com.typesafe.akkaakka-http-xml_2.1210.1.7Apache-2.0
com.typesafe.akkaakka-http_2.1210.1.7Apache-2.0
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.amazonaws    aws-java-sdk-core    1.11.476    Apache License, Version 2.0
    com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
    com.fasterxml.jackson.dataformat    jackson-dataformat-cbor    2.6.7    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
    commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
    joda-time    joda-time    2.8.1    Apache 2
    org.apache.httpcomponents    httpclient    4.5.5    Apache License, Version 2.0
        commons-codec    commons-codec    1.10    Apache License, Version 2.0
        commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
        org.apache.httpcomponents    httpcore    4.4.9    Apache License, Version 2.0
    software.amazon.ion    ion-java    1.0.2    The Apache License, Version 2.0
com.typesafe.akka    akka-http-xml_2.12    10.1.7    Apache-2.0
    com.typesafe.akka    akka-http_2.12    10.1.7    Apache-2.0
        com.typesafe.akka    akka-http-core_2.12    10.1.7    Apache-2.0
            com.typesafe.akka    akka-parsing_2.12    10.1.7    Apache-2.0
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang.modules    scala-xml_2.12    1.1.1    BSD 3-clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-http_2.12    10.1.7    Apache-2.0
    com.typesafe.akka    akka-http-core_2.12    10.1.7    Apache-2.0
        com.typesafe.akka    akka-parsing_2.12    10.1.7    Apache-2.0
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Configuration

The settings for the S3 connector are read by default from alpakka.s3 configuration section. Credentials are loaded as described in the DefaultAWSCredentialsProviderChain documentation. Therefore, if you are using Alpakka S3 connector in a standard environment, no configuration changes should be necessary. However, if you use a non-standard configuration path or need multiple different configurations, please refer to the attributes section below to see how to apply different configuration to different parts of the stream. All of the available configuration settings can be found in the reference.conf.

Store a file in S3

A file can be uploaded to S3 by creating a source of ByteStringByteString and running that with a sink created from S3.multipartUploadS3.multipartUpload.

Scala
val file: Source[ByteString, NotUsed] =
  Source.single(ByteString(body))

val s3Sink: Sink[ByteString, Source[MultipartUploadResult, NotUsed]] =
  S3.multipartUpload(bucket, bucketKey)

val result: Source[MultipartUploadResult, NotUsed] =
  file.runWith(s3Sink)
Java
final Source<ByteString, NotUsed> file = Source.single(ByteString.fromString(body()));

final Sink<ByteString, Source<MultipartUploadResult, NotUsed>> sink =
    S3.multipartUpload(bucket(), bucketKey());

final Source<MultipartUploadResult, NotUsed> resultCompletionStage =
    file.runWith(sink, materializer);

Download a file from S3

A source for downloading a file can be created by calling S3.downloadS3.download. It will emit an OptionOptional that will hold file’s data and metadata or will be empty if no such file can be found.

Scala
val s3File: Source[Option[(Source[ByteString, NotUsed], ObjectMetadata)], NotUsed] =
  S3.download(bucket, bucketKey)

val Some((data: Source[ByteString, _], metadata)) =
  s3File.runWith(Sink.head).futureValue

val result: Future[String] =
  data.map(_.utf8String).runWith(Sink.head)
Java
final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed>
    sourceAndMeta = S3.download(bucket(), bucketKey());
final Pair<Source<ByteString, NotUsed>, ObjectMetadata> dataAndMetadata =
    sourceAndMeta
        .runWith(Sink.head(), materializer)
        .toCompletableFuture()
        .get(5, TimeUnit.SECONDS)
        .get();

final Source<ByteString, NotUsed> data = dataAndMetadata.first();
final ObjectMetadata metadata = dataAndMetadata.second();

final CompletionStage<String> resultCompletionStage =
    data.map(ByteString::utf8String).runWith(Sink.head(), materializer);

String result = resultCompletionStage.toCompletableFuture().get(5, TimeUnit.SECONDS);

In order to download a range of a file’s data you can use overloaded method which additionally takes ByteRange as argument.

Scala
val downloadResult = S3.download(bucket, bucketKey, Some(ByteRange(bytesRangeStart, bytesRangeEnd)))
Java
final Source<Optional<Pair<Source<ByteString, NotUsed>, ObjectMetadata>>, NotUsed>
    sourceAndMeta =
        S3.download(
            bucket(), bucketKey(), ByteRange.createSlice(bytesRangeStart(), bytesRangeEnd()));

File metadata (ObjectMetadata) holds content type, size and other useful information about the object. Here’s an example of using this metadata to stream an object back to a client in Akka Http.

Scala
HttpResponse(
  entity = HttpEntity(
    metadata.contentType
      .flatMap(ContentType.parse(_).right.toOption)
      .getOrElse(ContentTypes.`application/octet-stream`),
    metadata.contentLength,
    data
  )
)
Java
HttpResponse.create()
    .withEntity(
        HttpEntities.create(
            metadata
                .getContentType()
                .map(ct -> ContentTypes.parse(ct))
                .orElse(ContentTypes.APPLICATION_OCTET_STREAM),
            metadata.getContentLength(),
            data));

Access object metadata without downloading object from S3

If you do not need object itself, you can query for only object metadata using a source from S3.getObjectMetadataS3.getObjectMetadata.

Scala
val metadata: Source[Option[ObjectMetadata], NotUsed] =
  S3.getObjectMetadata(bucket, bucketKey)
Java
final Source<Optional<ObjectMetadata>, NotUsed> source =
    S3.getObjectMetadata(bucket(), bucketKey());

List bucket contents

To get a list of all objects in a bucket, use S3.listBucketS3.listBucket. When run, this will give a stream of ListBucketResultContents.

Scala
val keySource: Source[ListBucketResultContents, NotUsed] =
  S3.listBucket(bucket, Some(listPrefix))
Java
final Source<ListBucketResultContents, NotUsed> keySource =
    S3.listBucket(bucket(), Option.apply(listPrefix()));

Copy upload (multi part)

Copy an S3 object from source bucket to target bucket using S3.multipartCopyS3.multipartCopy. When run, this will emit a single MultipartUploadResult with the information about the copied object.

Scala
val result: Source[MultipartUploadResult, NotUsed] =
  S3.multipartCopy(bucket, bucketKey, targetBucket, targetBucketKey).run()
Java
final Source<MultipartUploadResult, NotUsed> resultCompletionStage =
    S3.multipartCopy(bucket, sourceKey, targetBucket, targetKey).run(materializer);

If your bucket has versioning enabled, you could have multiple versions of the same object. By default AWS identifies the current version of the object to copy. You can optionally specify a specific version of the source object to copy by adding the sourceVersionId parameter.

Scala
val result: Source[MultipartUploadResult, NotUsed] =
  S3.multipartCopy(bucket,
                   bucketKey,
                   targetBucket,
                   targetBucketKey,
                   sourceVersionId = Some("3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo"))
    .run()
Java
String sourceVersionId = "3/L4kqtJlcpXroDTDmJ+rmSpXd3dIbrHY+MTRCxf3vjVBH40Nr8X8gdRQBpUMLUo";
final Source<MultipartUploadResult, NotUsed> resultCompletionStage =
    S3.multipartCopy(
            bucket,
            sourceKey,
            targetBucket,
            targetKey,
            Optional.of(sourceVersionId),
            S3Headers.create())
        .run(materializer);

Different options are available for server side encryption in the ServerSideEncryption fatory.

Scala
val keys = ServerSideEncryption
  .customerKeys(sseCustomerKey)
  .withMd5(sseCustomerMd5Key)

val result: Source[MultipartUploadResult, NotUsed] =
  S3.multipartCopy(bucket,
                   bucketKey,
                   targetBucket,
                   targetBucketKey,
                   s3Headers = S3Headers().withServerSideEncryption(keys))
    .run()
Java
final CustomerKeys keys =
    ServerSideEncryption.customerKeys(sseCustomerKey()).withMd5(sseCustomerMd5Key());

final Source<MultipartUploadResult, NotUsed> resultCompletionStage =
    S3.multipartCopy(
            bucket(),
            bucketKey(),
            targetBucket(),
            targetBucketKey(),
            S3Headers.create().withServerSideEncryption(keys))
        .run(materializer);

More S3 specific headers and arbitrary HTTP headers can be specified by adding to the S3Headers container.

Apply S3 settings to a part of the stream

It is possible to make one part of the stream use different S3Settings from the rest of the graph. This can be useful, when one stream is used to copy files across regions or even different S3 compatible endpoints. You can attach a custom S3Settings instance or a custom config path to a graph using attributes from S3Attributes:

Scala
val useVersion1Api = S3Ext(system).settings
  .withListBucketApiVersion(ApiVersion.ListBucketVersion1)

val keySource: Source[ListBucketResultContents, NotUsed] =
  S3.listBucket(bucket, Some(listPrefix))
    .withAttributes(S3Attributes.settings(useVersion1Api))
Java
final S3Settings useVersion1Api =
    S3Ext.get(system()).settings().withListBucketApiVersion(ApiVersion.getListBucketVersion1());

final Source<ListBucketResultContents, NotUsed> keySource =
    S3.listBucket(bucket(), Option.apply(listPrefix()))
        .withAttributes(S3Attributes.settings(useVersion1Api));

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> s3/test
Java
sbt
> s3/test

Some test code requires Minio running in the background. You can start one quickly using docker:

docker-compose up minio_prep

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.