AWS SQS
Amazon Simple Queue Service (Amazon SQS) offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components. Amazon SQS offers common constructs such as dead-letter queues and cost allocation tags. It provides a generic web services API and it can be accessed by any programming language that the AWS SDK supports.
For more information about AWS SQS please visit the official documentation.
The AWS SQS connector provides Akka Stream sources and sinks for AWS SQS queues.
Project Info: Alpakka AWS SQS | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-sqs
1.0-M2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12 |
JPMS module name | akka.stream.alpakka.aws.sqs |
License | |
Readiness level | Community-driven
Since 0.3, 2016-12-02
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sqs" % "1.0-M2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-sqs_2.12</artifactId> <version>1.0-M2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-sqs_2.12', version: '1.0-M2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.amazonaws aws-java-sdk-sqs 1.11.476 Apache License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.19 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.amazonaws aws-java-sdk-sqs 1.11.476 Apache License, Version 2.0 com.amazonaws aws-java-sdk-core 1.11.476 Apache License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.6.7.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.6.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.6.7 The Apache Software License, Version 2.0 com.fasterxml.jackson.dataformat jackson-dataformat-cbor 2.6.7 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.6.7 The Apache Software License, Version 2.0 commons-logging commons-logging 1.1.3 The Apache Software License, Version 2.0 joda-time joda-time 2.8.1 Apache 2 org.apache.httpcomponents httpclient 4.5.5 Apache License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 commons-logging commons-logging 1.1.3 The Apache Software License, Version 2.0 org.apache.httpcomponents httpcore 4.4.9 Apache License, Version 2.0 software.amazon.ion ion-java 1.0.2 The Apache License, Version 2.0 com.amazonaws jmespath-java 1.11.476 Apache License, Version 2.0 com.fasterxml.jackson.core jackson-databind 2.6.7.2 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-annotations 2.6.0 The Apache Software License, Version 2.0 com.fasterxml.jackson.core jackson-core 2.6.7 The Apache Software License, Version 2.0 com.typesafe.akka akka-stream_2.12 2.5.19 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.19 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.19 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.6 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause
Setup
Prepare an ActorSystem and a Materializer.
- Scala
-
implicit val system: ActorSystem = ActorSystem() implicit val mat: Materializer = ActorMaterializer()
- Java
-
system = ActorSystem.create(); materializer = ActorMaterializer.create(system);
This connector requires an implicit AmazonSQSAsync
instance to communicate with AWS SQS.
It is your code’s responsibility to call shutdown
to free any resources held by the client. In this example it will be called when the actor system is terminated.
- Scala
-
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials} import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder} val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")) implicit val awsSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder .standard() .withCredentials(credentialsProvider) .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build() system.registerOnTermination(awsSqsClient.shutdown())
- Java
-
import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.sqs.AmazonSQSAsync; import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")); AmazonSQSAsync awsSqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build(); system.registerOnTermination(() -> awsSqsClient.shutdown());
Read from an SQS queue
The SqsSourceSqsSource created source reads AWS Java SDK SQS Message
objects from any SQS queue given by the queue URL.
- Scala
-
SqsSource( queueUrl, SqsSourceSettings().withCloseOnEmptyReceive(true).withWaitTime(1.second) ).runWith(Sink.seq)
- Java
-
SqsSource.create( queueUrl, SqsSourceSettings.create() .withCloseOnEmptyReceive(true) .withWaitTime(Duration.ofSeconds(1)), sqsClient) .runWith(Sink.seq(), materializer);
In this example we use the closeOnEmptyReceive
to let the stream complete when there are no more messages on the queue. In realistic scenarios, you should add a KillSwitch
to the stream, see “Controlling stream completion with KillSwitch” in the Akka documentation.
Source configuration
- Scala
-
val settings = SqsSourceSettings() .withWaitTime(20.seconds) .withMaxBufferSize(100) .withMaxBatchSize(10) .withAttributes(immutable.Seq(SenderId, SentTimestamp)) .withMessageAttribute(MessageAttributeName.create("bar.*")) .withCloseOnEmptyReceive(true) .withVisibilityTimeout(10.seconds)
- Java
-
SqsSourceSettings settings = SqsSourceSettings.create() .withWaitTime(Duration.ofSeconds(20)) .withMaxBufferSize(100) .withMaxBatchSize(10) .withAttributes(Arrays.asList(Attribute.senderId(), Attribute.sentTimestamp())) .withMessageAttribute(MessageAttributeName.create("bar.*")) .withCloseOnEmptyReceive(true);
Options:
maxBatchSize
- the maximum number of messages to return (seeMaxNumberOfMessages
in AWS docs). Default: 10maxBufferSize
- internal buffer size used by theSource
. Default: 100 messageswaitTimeSeconds
- the duration for which the call waits for a message to arrive in the queue before returning (seeWaitTimeSeconds
in AWS docs). Default: 20 secondscloseOnEmptyReceive
- If true, the source completes when no messages are available.
More details are available in the AWS SQS Receive Message documentation.
An SqsSource
can either provide an infinite stream of messages (the default), or can drain its source queue until no further messages are available. The latter behaviour is enabled by setting the closeOnEmptyReceive
flag on creation. If set, the Source
will receive messages until it encounters an empty reply from the server. It then continues to emit any remaining messages in its local buffer. The stage will complete once the last message has been sent downstream.
Note that for short-polling (waitTimeSeconds
of 0), SQS may respond with an empty reply even if there are still messages in the queue. This behavior can be prevented by switching to long-polling (by setting waitTimeSeconds
to a nonzero value).
Be aware that the SqsSource
runs multiple requests to Amazon SQS in parallel. The maximum number of concurrent requests is limited by parallelism = maxBufferSize / maxBatchSize
. E.g.: By default maxBatchSize
is set to 10 and maxBufferSize
is set to 100 so at the maximum, SqsSource
will run 10 concurrent requests to Amazon SQS. AmazonSQSAsyncClient
uses a fixed thread pool with 50 threads by default. To tune the thread pool used by AmazonSQSAsyncClient
you can supply a custom ExecutorService
on client creation.
- Scala
-
val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")) implicit val customSqsClient: AmazonSQSAsync = AmazonSQSAsyncClientBuilder .standard() .withCredentials(credentialsProvider) .withExecutorFactory(new ExecutorFactory { override def newExecutor() = Executors.newFixedThreadPool(10) }) .withEndpointConfiguration(new EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build()
- Java
-
AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")); AmazonSQSAsync customSqsClient = AmazonSQSAsyncClientBuilder.standard() .withCredentials(credentialsProvider) .withExecutorFactory(() -> Executors.newFixedThreadPool(10)) .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration(sqsEndpoint, "eu-central-1")) .build();
Please make sure to configure a big enough thread pool to avoid resource starvation. This is especially important, if you share the client between multiple Sources, Sinks and Flows. For the SQS Sinks and Sources the sum of all parallelism
(Source) and maxInFlight
(Sink) must be less than or equal to the thread pool size.
Publish messages to an SQS queue
Create a String
-accepting sink, publishing to an SQS queue.
- Scala
-
Source .single("alpakka") .runWith(SqsPublishSink(queueUrl))
- Java
-
Source.single("alpakka") .runWith( SqsPublishSink.create(queueUrl, SqsPublishSettings.create(), sqsClient), materializer);
Create a SendMessageRequest
-accepting sink, that publishes an SQS queue.
- Scala
-
// for fix SQS queue Source .single(new SendMessageRequest().withMessageBody("alpakka")) .runWith(SqsPublishSink.messageSink(queueUrl)) // for dynamic SQS queues Source .single(new SendMessageRequest().withMessageBody("alpakka").withQueueUrl(queueUrl)) .runWith(SqsPublishSink.messageSink())
- Java
-
// for fix SQS queue Source.single(new SendMessageRequest().withMessageBody("alpakka")) .runWith( SqsPublishSink.messageSink(queueUrl, SqsPublishSettings.create(), sqsClient), materializer); // for dynamic SQS queues Source.single(new SendMessageRequest().withMessageBody("alpakka").withQueueUrl(queueUrl)) .runWith( SqsPublishSink.messageSink(SqsPublishSettings.create(), sqsClient), materializer);
You can also build flow stages which publish messages to SQS queues, backpressure on queue response, and then forward SqsPublishResult
further down the stream.
- Scala
-
// for fix SQS queue Source .single(new SendMessageRequest().withMessageBody("alpakka")) .via(SqsPublishFlow(queueUrl)) .runWith(Sink.head) // for dynamic SQS queues Source .single(new SendMessageRequest().withMessageBody("alpakka").withQueueUrl(queueUrl)) .via(SqsPublishFlow()) .runWith(Sink.head)
- Java
-
// for fix SQS queue Source.single(new SendMessageRequest().withMessageBody("alpakka-flow")) .via(SqsPublishFlow.create(queueUrl, SqsPublishSettings.create(), sqsClient)) .runWith(Sink.head(), materializer); // for dynamic SQS queues Source.single(new SendMessageRequest(queueUrl, "alpakka-flow")) .via(SqsPublishFlow.create(SqsPublishSettings.create(), sqsClient)) .runWith(Sink.head(), materializer);
Group messages and publish batches to an SQS queue
Create a sink, that forwards String
to the SQS queue. However, the main difference from the previous use case, it batches items and sends as a one request.
Note: There is also another option to send batch of messages to SQS which is using AmazonSQSBufferedAsyncClient
. This client buffers SendMessageRequest
s under the hood and sends them as a batch instead of sending them one by one. However, beware that AmazonSQSBufferedAsyncClient
does not support FIFO Queues. See documentation for client-side buffering.
- Scala
-
val messages = for (i <- 0 until 10) yield s"Message - $i" val future = Source(messages) .runWith(SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.Defaults.withMaxBatchSize(2)))
- Java
-
List<String> messagesToSend = new ArrayList<>(); for (int i = 0; i < 20; i++) { messagesToSend.add("message - " + i); } CompletionStage<Done> done = Source.from(messagesToSend) .runWith( SqsPublishSink.grouped(queueUrl, SqsPublishGroupedSettings.create(), sqsClient), materializer);
Grouping configuration
- Scala
-
val batchSettings = SqsPublishGroupedSettings() .withMaxBatchSize(10) .withMaxBatchWait(500.millis) .withConcurrentRequests(1)
- Java
-
SqsPublishGroupedSettings batchSettings = SqsPublishGroupedSettings.create() .withMaxBatchSize(10) .withMaxBatchWait(Duration.ofMillis(500)) .withConcurrentRequests(1);
Options:
maxBatchSize
- the maximum number of messages in batch to send SQS. Default: 10.maxBatchWait
- the maximum duration for which the stage waits untilmaxBatchSize
messages arrived. Sends what is collects at the end of the time period even though themaxBatchSize
is not fulfilled. Default: 500 millisecondsconcurrentRequests
- the number of batches sending to SQS concurrently.
Publish lists as batches to an SQS queue
Create a sink, that publishes Iterable[String]
Iterable<String>
to the SQS queue.
Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch request. If the batch has more than 10 entries, the request will fail.
- Scala
-
val messages = for (i <- 0 until 10) yield s"Message - $i" val future = Source .single(messages) .runWith(SqsPublishSink.batch(queueUrl))
- Java
-
List<String> messagesToSend = new ArrayList<>(); for (int i = 0; i < 10; i++) { messagesToSend.add("Message - " + i); } Iterable<String> it = messagesToSend; CompletionStage<Done> done = Source.single(it) .runWith( SqsPublishSink.batch(queueUrl, SqsPublishBatchSettings.create(), sqsClient), materializer);
Create a sink, that publishes Iterable[SendMessageRequest]
Iterable<SendMessageRequest>
to the SQS queue.
Be aware that the size of the batch must be less than or equal to 10 because Amazon SQS has a limit for batch requests. If the batch has more than 10 entries, the request will fail.
- Scala
-
val messages = for (i <- 0 until 10) yield new SendMessageRequest().withMessageBody(s"Message - $i") val future = Source .single(messages) .runWith(SqsPublishSink.batchedMessageSink(queueUrl))
- Java
-
List<SendMessageRequest> messagesToSend = new ArrayList<>(); for (int i = 0; i < 10; i++) { messagesToSend.add(new SendMessageRequest().withMessageBody("Message - " + i)); } Iterable<SendMessageRequest> it = messagesToSend; CompletionStage<Done> done = Source.single(it) .runWith( SqsPublishSink.batchedMessageSink( queueUrl, SqsPublishBatchSettings.create(), sqsClient), materializer);
Batch configuration
- Scala
-
val batchSettings = SqsPublishBatchSettings() .withConcurrentRequests(1)
- Java
-
SqsPublishBatchSettings batchSettings = SqsPublishBatchSettings.create().withConcurrentRequests(1);
Options:
concurrentRequests
- the number of batches sending to SQS concurrently.
Updating message statuses
SqsAckSink
and SqsAckFlow
provide the possibility to acknowledge (delete), ignore, or postpone messages on an SQS queue. They accept MessageAction
sub-classes to select the action to be taken.
For every message you may decide which action to take and push it together with message back to the queue:
Delete
- delete message from the queueIgnore
- don’t change that message, and let it reappear in the queue after the visibility timeoutChangeMessageVisibility(visibilityTimeout)
- can be used to postpone a message, or make the message immediately visible to other consumers. See official documentation for more details.
Acknowledge (delete) messages
- Scala
-
SqsSource(queueUrl) .take(1) .map(MessageAction.Delete(_)) .runWith(SqsAckSink(queueUrl))
- Java
-
source .map(m -> MessageAction.delete(m)) .runWith(SqsAckSink.create(queueUrl, SqsAckSettings.create(), awsClient), materializer);
Ignore messages
- Scala
-
SqsSource(queueUrl) .map(MessageAction.Ignore(_)) .runWith(SqsAckSink(queueUrl))
- Java
-
source .map(m -> MessageAction.ignore(m)) .via(SqsAckFlow.create(queueUrl, SqsAckSettings.create(), awsClient)) .runWith(Sink.seq(), materializer);
Change Visibility Timeout of messages
- Scala
-
SqsSource(queueUrl) .take(1) .map(MessageAction.ChangeMessageVisibility(_, 5)) .runWith(SqsAckSink(queueUrl))
- Java
-
source .map(m -> MessageAction.changeMessageVisibility(m, 12)) .runWith(SqsAckSink.create(queueUrl, SqsAckSettings.create(), awsClient), materializer);
Update message status in a flow
The flow accepts a MessageAction
sub-classes, and returns SqsAckResult
.
- Scala
-
SqsSource(queueUrl) .take(1) .map(MessageAction.Delete(_)) .via(SqsAckFlow(queueUrl)) .runWith(Sink.head)
- Java
-
source .map(m -> MessageAction.delete(m)) .via(SqsAckFlow.create(queueUrl, SqsAckSettings.create(), awsClient)) .runWith(Sink.seq(), materializer);
SqsAck configuration
- Scala
-
val sinkSettings = SqsAckSettings() .withMaxInFlight(10)
- Java
-
SqsAckSettings sinkSettings = SqsAckSettings.create().withMaxInFlight(10);
Options:
maxInFlight
- maximum number of messages being processed byAmazonSQSAsync
at the same time. Default: 10
Updating message statuses in batches with grouping
SqsAckFlow.grouped
is a flow that can acknowledge (delete), ignore, or postpone messages, but it batches items and sends them as one request per action.
Acknowledge (delete) messages:
- Scala
-
SqsSource(queueUrl) .take(10) .map(MessageAction.Delete(_)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.Defaults)) .runWith(Sink.seq)
- Java
-
source .map(m -> MessageAction.delete(m)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient)) .runWith(Sink.seq(), materializer);
Ignore messages:
- Scala
-
Source(messages) .take(10) .map(MessageAction.Ignore(_)) .via(SqsAckFlow.grouped("queue", SqsAckGroupedSettings.Defaults)) .runWith(Sink.seq)
- Java
-
source .map(m -> MessageAction.ignore(m)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient)) .runWith(Sink.seq(), materializer);
Change Visibility Timeout of messages:
- Scala
-
SqsSource(queueUrl) .take(10) .map(MessageAction.ChangeMessageVisibility(_, 5)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.Defaults)) .runWith(Sink.seq)
- Java
-
source .map(m -> MessageAction.changeMessageVisibility(m, 5)) .via(SqsAckFlow.grouped(queueUrl, SqsAckGroupedSettings.create(), awsClient)) .runWith(Sink.seq(), materializer);
Acknowledge grouping configuration
- Scala
-
val batchSettings = SqsAckGroupedSettings() .withMaxBatchSize(10) .withMaxBatchWait(500.millis) .withConcurrentRequests(1)
- Java
-
SqsAckGroupedSettings flowSettings = SqsAckGroupedSettings.create() .withMaxBatchSize(10) .withMaxBatchWait(Duration.ofMillis(500)) .withConcurrentRequests(1);
Options:
maxBatchSize
- the maximum number of messages in batch to send SQS. Default: 10.maxBatchWait
- the maximum duration for which the stage waits untilmaxBatchSize
messages arrived. Sends what is collects at the end of the time period even though themaxBatchSize
is not fulfilled. Default: 500 millisecondsconcurrentRequests
- the number of batches sending to SQS concurrently.
Integration testing
For integration testing without touching Amazon SQS, Alpakka uses ElasticMQ, a queuing service which serves an AWS SQS compatible API.
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
The code requires ElasticMQ running in the background. You can start it quickly using docker:
docker-compose up elasticmq
- Scala
-
sbt 'project sqs' test
- Java
-
sbt 'project sqs' test