AWS SQS

Amazon Simple Queue Service

Amazon Simple Queue Service (Amazon SQS) offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components. Amazon SQS offers common constructs such as dead-letter queues and cost allocation tags. It provides a generic web services API and it can be accessed by any programming language that the AWS SDK supports.

For more information about AWS SQS please visit the official documentation.

The AWS SQS connector provides Akka Stream sources and sinks for AWS SQS queues.

Project Info: Alpakka AWS SQS
Artifact
com.lightbend.akka
akka-stream-alpakka-sqs
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.aws.sqs
License
Readiness level
Community-driven
Since 0.3, 2016-12-02
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-sqs_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-sqs_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.amazonawsaws-java-sdk-sqs1.11.476Apache License, Version 2.0
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.amazonaws    aws-java-sdk-sqs    1.11.476    Apache License, Version 2.0
    com.amazonaws    aws-java-sdk-core    1.11.476    Apache License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
        com.fasterxml.jackson.dataformat    jackson-dataformat-cbor    2.6.7    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
        commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
        joda-time    joda-time    2.8.1    Apache 2
        org.apache.httpcomponents    httpclient    4.5.5    Apache License, Version 2.0
            commons-codec    commons-codec    1.10    Apache License, Version 2.0
            commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
            org.apache.httpcomponents    httpcore    4.4.9    Apache License, Version 2.0
        software.amazon.ion    ion-java    1.0.2    The Apache License, Version 2.0
    com.amazonaws    jmespath-java    1.11.476    Apache License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Setup

Prepare an ActorSystem and a Materializer.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()
Java
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);

This connector requires an implicit AmazonSQSAsync instance to communicate with AWS SQS.

It is your code’s responsibility to call shutdown to free any resources held by the client. In this example it will be called when the actor system is terminated.

Scala
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}

val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
implicit val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder
  .standard()
  .withCredentials(credentialsProvider)
  .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1"))
  .build()
system.registerOnTermination(awsSqsClient.shutdown())
Java
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;

AWSCredentialsProvider credentialsProvider =
    new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"));
AmazonSQSAsync awsSqsClient =
    AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withEndpointConfiguration(
            new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1"))
        .build();
system.registerOnTermination(() -> awsSqsClient.shutdown());

Read from an SQS queue

The SqsSourceSqsSource created source reads AWS Java SDK SQS Message objects from any SQS queue given by the queue URL.

Scala
SqsSource(
  queueUrl,
  SqsSourceSettings().withCloseOnEmptyReceive(true).withWaitTime(1.second)
).runWith(Sink.seq)
Java
SqsSource.create(
        queueUrl,
        SqsSourceSettings.create()
            .withCloseOnEmptyReceive(true)
            .withWaitTime(Duration.ofSeconds(1)),
        sqsClient)
    .runWith(Sink.seq(), materializer);

In this example we use the closeOnEmptyReceive to let the stream complete when there are no more messages on the queue. In realistic scenarios, you should add a KillSwitch to the stream, see “Controlling stream completion with KillSwitch” in the Akka documentation.

Source configuration

Scala
val settings = SqsSourceSettings()
  .withWaitTime(20.seconds)
  .withMaxBufferSize(100)
  .withMaxBatchSize(10)
  .withAttributes(immutable.Seq(SenderId, SentTimestamp))
  .withMessageAttribute(MessageAttributeName.create("bar.*"))
  .withCloseOnEmptyReceive(true)
  .withVisibilityTimeout(10.seconds)
Java
SqsSourceSettings settings =
    SqsSourceSettings.create()
        .withWaitTime(Duration.ofSeconds(20))
        .withMaxBufferSize(100)
        .withMaxBatchSize(10)
        .withAttributes(Arrays.asList(Attribute.senderId(), Attribute.sentTimestamp()))
        .withMessageAttribute(MessageAttributeName.create("bar.*"))
        .withCloseOnEmptyReceive(true);

Options:

  • maxBatchSize - the maximum number of messages to return (see MaxNumberOfMessages in AWS docs). Default: 10
  • maxBufferSize - internal buffer size used by the Source. Default: 100 messages
  • waitTimeSeconds - the duration for which the call waits for a message to arrive in the queue before returning (see WaitTimeSeconds in AWS docs). Default: 20 seconds
  • closeOnEmptyReceive - If true, the source completes when no messages are available.

More details are available in the AWS SQS Receive Message documentation.

An SqsSource can either provide an infinite stream of messages (the default), or can drain its source queue until no further messages are available. The latter behaviour is enabled by setting the closeOnEmptyReceive flag on creation. If set, the Source will receive messages until it encounters an empty reply from the server. It then continues to emit any remaining messages in its local buffer. The stage will complete once the last message has been sent downstream.

Note that for short-polling (waitTimeSeconds of 0), SQS may respond with an empty reply even if there are still messages in the queue. This behavior can be prevented by switching to long-polling (by setting waitTimeSeconds to a nonzero value).

Be aware that the SqsSource runs multiple requests to Amazon SQS in parallel. The maximum number of concurrent requests is limited by parallelism = maxBufferSize / maxBatchSize. E.g.: By default maxBatchSize is set to 10 and maxBufferSize is set to 100 so at the maximum, SqsSource will run 10 concurrent requests to Amazon SQS. AmazonSQSAsyncClient uses a fixed thread pool with 50 threads by default. To tune the thread pool used by AmazonSQSAsyncClient you can supply a custom ExecutorService on client creation.

Scala
val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
implicit val customSqsClient: AmazonSQSAsync =
  AmazonSQSAsyncClientBuilder
    .standard()
    .withCredentials(credentialsProvider)
    .withExecutorFactory(new ExecutorFactory {
      override def newExecutor() = Executors.newFixedThreadPool(10)
    })
    .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1"))
    .build()
Java
AWSCredentialsProvider credentialsProvider =
    new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"));

AmazonSQSAsync customSqsClient =
    AmazonSQSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withExecutorFactory(() -> Executors.newFixedThreadPool(10))
        .withEndpointConfiguration(
            new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1"))
        .build();

Please make sure to configure a big enough thread pool to avoid resource starvation. This is especially important, if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism (Source) and maxInFlight (Sink) must be less than or equal to the thread pool size.

Publish messages to an SQS queue

Create a String-accepting sink, publishing to an SQS queue.

Scala
Source
  .single("alpakka")
  .runWith(SqsPublishSink(queueUrl))
Java
Source.single("alpakka")
    .runWith(
        SqsPublishSink.create(queueUrl, SqsPublishSettings.create(), sqsClient),
        materializer);

Create a SendMessageRequest-accepting sink, that publishes an SQS queue.

Scala
// for fix SQS queue
Source
  .single(new SendMessageRequest().withMessageBody("alpakka"))
  .runWith(SqsPublishSink.messageSink(queueUrl))

// for dynamic SQS queues
Source
  .single(new SendMessageRequest().withMessageBody("alpakka").withQueueUrl(queueUrl))
  .runWith(SqsPublishSink.messageSink())
Java
// for fix SQS queue
Source.single(new SendMessageRequest().withMessageBody("alpakka"))
    .runWith(
        SqsPublishSink.messageSink(queueUrl, SqsPublishSettings.create(), sqsClient),
        materializer);

// for dynamic SQS queues
Source.single(new SendMessageRequest().withMessageBody("alpakka").withQueueUrl(queueUrl))
    .runWith(
        SqsPublishSink.messageSink(SqsPublishSettings.create(), sqsClient), materializer);

You can also build flow stages which publish messages to SQS queues, backpressure on queue response, and then forward SqsPublishResult further down the stream.

Scala
// for fix SQS queue
Source
  .single(new SendMessageRequest().withMessageBody("alpakka"))
  .via(SqsPublishFlow(queueUrl))
  .runWith(Sink.head)

// for dynamic SQS queues
Source
  .single(new SendMessageRequest().withMessageBody("alpakka").withQueueUrl(queueUrl))
  .via(SqsPublishFlow())
  .runWith(Sink.head)
Java
// for fix SQS queue
Source.single(new SendMessageRequest().withMessageBody("alpakka-flow"))
    .via(SqsPublishFlow.create(queueUrl, SqsPublishSettings.create(), sqsClient))
    .runWith(Sink.head(), materializer);

// for dynamic SQS queues
Source.single(new SendMessageRequest(queueUrl, "alpakka-flow"))
    .via(SqsPublishFlow.create(SqsPublishSettings.create(), sqsClient))
    .runWith(Sink.head(), materializer);

Group messages and publish batches to an SQS queue

Create a sink, that forwards String to the SQS queue. However, the main difference from the previous use case, it batches items and sends as a one request.

Note: There is also another option to send batch of messages to SQS which is using AmazonSQSBufferedAsyncClient. This client buffers SendMessageRequests under the hood and sends them as a batch instead of sending them one by one. However, beware that AmazonSQSBufferedAsyncClient does not support FIFO Queues. See documentation for client-side buffering.

Scala
val messages = for (i <- 0 until 10) yield s"Message - $i"

val future = Source(messages)
  .runWith(SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.Defaults.withMaxBatchSize(2)))
Java
List<String> messagesToSend = new ArrayList<>();
for (int i = 0; i < 20; i++) {
  messagesToSend.add("message - " + i);
}

CompletionStage<Done> done =
    Source.from(messagesToSend)
        .runWith(
            SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.create(), sqsClient),
            materializer);

Grouping configuration

Scala
val batchSettings =
  SqsPublishGroupedSettings()
    .withMaxBatchSize(10)
    .withMaxBatchWait(500.millis)
    .withConcurrentRequests(1)
Java
SqsPublishGroupedSettings batchSettings =
    SqsPublishGroupedSettings.create()
        .withMaxBatchSize(10)
        .withMaxBatchWait(Duration.ofMillis(500))
        .withConcurrentRequests(1);

Options:

  • maxBatchSize - the maximum number of messages in batch to send SQS. Default: 10.
  • maxBatchWait - the maximum duration for which the stage waits until maxBatchSize messages arrived. Sends what is collects at the end of the time period even though the maxBatchSize is not fulfilled. Default: 500 milliseconds
  • concurrentRequests - the number of batches sending to SQS concurrently.

Publish lists as batches to an SQS queue

Create a sink, that publishes Iterable[String]Iterable<String> to the SQS queue.

Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch request. If the batch has more than 10 entries, the request will fail.

Scala
val messages = for (i <- 0 until 10) yield s"Message - $i"

val future = Source
  .single(messages)
  .runWith(SqsPublishSink.batch(queueUrl))
Java
List<String> messagesToSend = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messagesToSend.add("Message - " + i);
}
Iterable<String> it = messagesToSend;

CompletionStage<Done> done =
    Source.single(it)
        .runWith(
            SqsPublishSink.batch(queueUrl, SqsPublishBatchSettings.create(), sqsClient),
            materializer);

Create a sink, that publishes Iterable[SendMessageRequest]Iterable<SendMessageRequest> to the SQS queue.

Warning

Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch requests. If the batch has more than 10 entries, the request will fail.

Scala
val messages = for (i <- 0 until 10) yield new SendMessageRequest().withMessageBody(s"Message - $i")

val future = Source
  .single(messages)
  .runWith(SqsPublishSink.batchedMessageSink(queueUrl))
Java
List<SendMessageRequest> messagesToSend = new ArrayList<>();
for (int i = 0; i < 10; i++) {
  messagesToSend.add(new SendMessageRequest().withMessageBody("Message - " + i));
}
Iterable<SendMessageRequest> it = messagesToSend;

CompletionStage<Done> done =
    Source.single(it)
        .runWith(
            SqsPublishSink.batchedMessageSink(
                queueUrl, SqsPublishBatchSettings.create(), sqsClient),
            materializer);

Batch configuration

Scala
val batchSettings =
  SqsPublishBatchSettings()
    .withConcurrentRequests(1)
Java
SqsPublishBatchSettings batchSettings =
    SqsPublishBatchSettings.create().withConcurrentRequests(1);

Options:

  • concurrentRequests - the number of batches sending to SQS concurrently.

Updating message statuses

SqsAckSink and SqsAckFlow provide the possibility to acknowledge (delete), ignore, or postpone messages on an SQS queue. They accept MessageAction sub-classes to select the action to be taken.

For every message you may decide which action to take and push it together with message back to the queue:

  • Delete - delete message from the queue
  • Ignore - don’t change that message, and let it reappear in the queue after the visibility timeout
  • ChangeMessageVisibility(visibilityTimeout) - can be used to postpone a message, or make the message immediately visible to other consumers. See official documentation for more details.

Acknowledge (delete) messages

Scala
SqsSource(queueUrl)
  .take(1)
  .map(MessageAction.Delete(_))
  .runWith(SqsAckSink(queueUrl))
Java
source
    .map(m -> MessageAction.delete(m))
    .runWith(SqsAckSink.create(queueUrl, SqsAckSettings.create(), awsClient), materializer);

Ignore messages

Scala
SqsSource(queueUrl)
  .map(MessageAction.Ignore(_))
  .runWith(SqsAckSink(queueUrl))
Java
source
    .map(m -> MessageAction.ignore(m))
    .via(SqsAckFlow.create(queueUrl, SqsAckSettings.create(), awsClient))
    .runWith(Sink.seq(), materializer);

Change Visibility Timeout of messages

Scala
SqsSource(queueUrl)
  .take(1)
  .map(MessageAction.ChangeMessageVisibility(_, 5))
  .runWith(SqsAckSink(queueUrl))
Java
source
    .map(m -> MessageAction.changeMessageVisibility(m, 12))
    .runWith(SqsAckSink.create(queueUrl, SqsAckSettings.create(), awsClient), materializer);

Update message status in a flow

The flow accepts a MessageAction sub-classes, and returns SqsAckResult .

Scala
SqsSource(queueUrl)
  .take(1)
  .map(MessageAction.Delete(_))
  .via(SqsAckFlow(queueUrl))
  .runWith(Sink.head)
Java
source
    .map(m -> MessageAction.delete(m))
    .via(SqsAckFlow.create(queueUrl, SqsAckSettings.create(), awsClient))
    .runWith(Sink.seq(), materializer);

SqsAck configuration

Scala
val sinkSettings =
  SqsAckSettings()
    .withMaxInFlight(10)
Java
SqsAckSettings sinkSettings = SqsAckSettings.create().withMaxInFlight(10);

Options:

  • maxInFlight - maximum number of messages being processed by AmazonSQSAsync at the same time. Default: 10

Updating message statuses in batches with grouping

SqsAckFlow.grouped is a flow that can acknowledge (delete), ignore, or postpone messages, but it batches items and sends them as one request per action.

Acknowledge (delete) messages:

Scala
SqsSource(queueUrl)
  .take(10)
  .map(MessageAction.Delete(_))
  .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.Defaults))
  .runWith(Sink.seq)
Java
source
    .map(m -> MessageAction.delete(m))
    .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient))
    .runWith(Sink.seq(), materializer);

Ignore messages:

Scala
Source(messages)
  .take(10)
  .map(MessageAction.Ignore(_))
  .via(SqsAckFlow.grouped("queue", SqsAckGroupedSettings.Defaults))
  .runWith(Sink.seq)
Java
source
    .map(m -> MessageAction.ignore(m))
    .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient))
    .runWith(Sink.seq(), materializer);

Change Visibility Timeout of messages:

Scala
SqsSource(queueUrl)
  .take(10)
  .map(MessageAction.ChangeMessageVisibility(_, 5))
  .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.Defaults))
  .runWith(Sink.seq)
Java
source
    .map(m -> MessageAction.changeMessageVisibility(m, 5))
    .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient))
    .runWith(Sink.seq(), materializer);

Acknowledge grouping configuration

Scala
val batchSettings =
  SqsAckGroupedSettings()
    .withMaxBatchSize(10)
    .withMaxBatchWait(500.millis)
    .withConcurrentRequests(1)
Java
SqsAckGroupedSettings flowSettings =
    SqsAckGroupedSettings.create()
        .withMaxBatchSize(10)
        .withMaxBatchWait(Duration.ofMillis(500))
        .withConcurrentRequests(1);

Options:

  • maxBatchSize - the maximum number of messages in batch to send SQS. Default: 10.
  • maxBatchWait - the maximum duration for which the stage waits until maxBatchSize messages arrived. Sends what is collects at the end of the time period even though the maxBatchSize is not fulfilled. Default: 500 milliseconds
  • concurrentRequests - the number of batches sending to SQS concurrently.

Integration testing

For integration testing without touching Amazon SQS, Alpakka uses ElasticMQ, a queuing service which serves an AWS SQS compatible API.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

The code requires ElasticMQ running in the background. You can start it quickly using docker:

docker-compose up elasticmq

Scala
sbt 'project sqs' test
Java
sbt 'project sqs' test
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.