Shared AWS client configuration

Underlying HTTP client

Scala
import com.github.matsluni.akkahttpspi.AkkaHttpClient
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest

// Don't encode credentials in your source code!
// see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
val credentialsProvider = StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))
implicit val awsSqsClient = SqsAsyncClient
  .builder()
  .credentialsProvider(credentialsProvider)
  .region(Region.EU_CENTRAL_1)
  .httpClient(AkkaHttpClient.builder().withActorSystem(system).build())
  // Possibility to configure the retry policy
  // see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
  // .overrideConfiguration(...)
  .build()

system.registerOnTermination(awsSqsClient.close())
Java
import com.github.matsluni.akkahttpspi.AkkaHttpClient;
import org.junit.Rule;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

// Don't encode credentials in your source code!
// see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
StaticCredentialsProvider credentialsProvider =
    StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"));
SqsAsyncClient sqsClient =
    SqsAsyncClient.builder()
        .credentialsProvider(credentialsProvider)
        .region(Region.EU_CENTRAL_1)
        .httpClient(AkkaHttpClient.builder().withActorSystem(system).build())
        // Possibility to configure the retry policy
        // see https://doc.akka.io/docs/alpakka/current/aws-shared-configuration.html
        // .overrideConfiguration(...)
        .build();

system.registerOnTermination(() -> sqsClient.close());

The example snippets show how the AWS clients are setup to use Akka HTTP as the default HTTP client implementation via the thin adapter library AWS Akka-Http SPI implementation. By setting the httpClient explicitly (as above) the Akka actor system is reused. If it is not set explicitly then a separate actor system will be created internally.

Using Netty

It is possible to configure the use of Netty instead, which is Amazon’s default. Add an appropriate Netty version to the dependencies and configure NettyNioAsyncHttpClient.

Scala
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
val customClient: SdkAsyncHttpClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build()
  implicit val customSqsClient = SqsAsyncClient
    .builder()
    .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
    .region(Region.EU_CENTRAL_1)
    .httpClient(customClient)
    .build()
Java
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
SdkAsyncHttpClient customClient = NettyNioAsyncHttpClient.builder().maxConcurrency(100).build();
final SqsAsyncClient customSqsClient =
    SqsAsyncClient.builder()
        .credentialsProvider(
            StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
        .endpointOverride(URI.create(sqsEndpoint))
        .region(Region.US_WEST_2)
        .httpClient(customClient)
        .build();

system.registerOnTermination(() -> customSqsClient.close());

Please make sure to configure a big enough thread pool for the Netty client to avoid resource starvation. This is especially important, if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink) must be less than or equal to the thread pool size.

AWS retry configuration

The AWS SDK 2 supports request retrying with exponential backoff.

The request retry behaviour is configurable via the SdkDefaultClientBuilder.overrideConfiguration method by using the RetryPolicy.

Scala
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy
import software.amazon.awssdk.core.retry.conditions.RetryCondition

.overrideConfiguration(
  ClientOverrideConfiguration
    .builder()
    .retryPolicy(
      // This example shows the AWS SDK 2 `RetryPolicy.defaultRetryPolicy()`
      // See https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html
      RetryPolicy.builder
        .backoffStrategy(BackoffStrategy.defaultStrategy)
        .throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy)
        .numRetries(SdkDefaultRetrySetting.DEFAULT_MAX_RETRIES)
        .retryCondition(RetryCondition.defaultRetryCondition)
        .build
    )
    .build()
)
Java
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetrySetting;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.BackoffStrategy;
import software.amazon.awssdk.core.retry.conditions.RetryCondition;

.overrideConfiguration(
    ClientOverrideConfiguration.builder()
        .retryPolicy(
            // This example shows the AWS SDK 2 `RetryPolicy.defaultRetryPolicy()`
            // See
            // https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/core/retry/RetryPolicy.html
            RetryPolicy.builder()
                .backoffStrategy(BackoffStrategy.defaultStrategy())
                .throttlingBackoffStrategy(BackoffStrategy.defaultThrottlingStrategy())
                .numRetries(SdkDefaultRetrySetting.DEFAULT_MAX_RETRIES)
                .retryCondition(RetryCondition.defaultRetryCondition())
                .build())
        .build())

AWS Access Keys

Do not encode AWS Access Keys in your source code or in static configuration. Please refer to Best Practices for Managing AWS Access Keys for proper AWS Access Key management.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.