AWS SNS

The AWS SNS connector provides an Akka Stream Flow and Sink for push notifications through AWS SNS.

For more information about AWS SNS please visit the official documentation.

Project Info: Alpakka AWS SNS
Artifact
com.lightbend.akka
akka-stream-alpakka-sns
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.aws.sns
License
Readiness level
Community-driven
Since 0.8, 2017-05-05
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-sns" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-sns_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-sns_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.amazonawsaws-java-sdk-sns1.11.476Apache License, Version 2.0
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.amazonaws    aws-java-sdk-sns    1.11.476    Apache License, Version 2.0
    com.amazonaws    aws-java-sdk-core    1.11.476    Apache License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
        com.fasterxml.jackson.dataformat    jackson-dataformat-cbor    2.6.7    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
        commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
        joda-time    joda-time    2.8.1    Apache 2
        org.apache.httpcomponents    httpclient    4.5.5    Apache License, Version 2.0
            commons-codec    commons-codec    1.10    Apache License, Version 2.0
            commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
            org.apache.httpcomponents    httpcore    4.4.9    Apache License, Version 2.0
        software.amazon.ion    ion-java    1.0.2    The Apache License, Version 2.0
    com.amazonaws    aws-java-sdk-sqs    1.11.476    Apache License, Version 2.0
        com.amazonaws    aws-java-sdk-core    1.11.476    Apache License, Version 2.0
            com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
            com.fasterxml.jackson.dataformat    jackson-dataformat-cbor    2.6.7    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
            commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
            joda-time    joda-time    2.8.1    Apache 2
            org.apache.httpcomponents    httpclient    4.5.5    Apache License, Version 2.0
                commons-codec    commons-codec    1.10    Apache License, Version 2.0
                commons-logging    commons-logging    1.1.3    The Apache Software License, Version 2.0
                org.apache.httpcomponents    httpcore    4.4.9    Apache License, Version 2.0
            software.amazon.ion    ion-java    1.0.2    The Apache License, Version 2.0
        com.amazonaws    jmespath-java    1.11.476    Apache License, Version 2.0
            com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
    com.amazonaws    jmespath-java    1.11.476    Apache License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.6.7.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.6.0    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.6.7    The Apache Software License, Version 2.0
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Setup

Sources provided by this connector need a prepared AmazonSNSAsyncClient to publish messages to a topic.

Scala
import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration

val credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
implicit val awsSnsClient: AmazonSNSAsync = AmazonSNSAsyncClientBuilder
  .standard()
  .withCredentials(credentialsProvider)
  .withEndpointConfiguration(new EndpointConfiguration(endEndpoint, "eu-central-1"))
  .build()
system.registerOnTermination(awsSnsClient.shutdown())
Java
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNSAsync;
import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder;

AWSCredentialsProvider credentialsProvider =
    new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"));

AmazonSNSAsync awsSnsClient =
    AmazonSNSAsyncClientBuilder.standard()
        .withCredentials(credentialsProvider)
        .withEndpointConfiguration(
            new AwsClientBuilder.EndpointConfiguration(endpoint, "eu-central-1"))
        .build();
system.registerOnTermination(() -> awsSnsClient.shutdown());

We will also need an ActorSystem and an ActorMaterializer.

Scala
implicit val system: ActorSystem = ActorSystem()
implicit val mat: Materializer = ActorMaterializer()
Java
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);

This is all preparation that we are going to need.

Publish messages to an SNS topic

Now we can publish a message to any SNS topic where we have access to by providing the topic ARN to the SnsPublisher Flow or Sink factory method.

Using a Flow

Scala
Source
  .single("message")
  .via(SnsPublisher.flow(topicArn))
  .runWith(Sink.foreach(res => println(res.getMessageId)))

Source
  .single(new PublishRequest().withMessage("message"))
  .via(SnsPublisher.publishFlow(topicArn))
  .runWith(Sink.foreach(res => println(res.getMessageId)))

Source
  .single(new PublishRequest().withMessage("message").withTopicArn(topicArn))
  .via(SnsPublisher.publishFlow())
  .runWith(Sink.foreach(res => println(res.getMessageId)))
Java
Source.single("message")
    .via(SnsPublisher.createFlow(topicArn, snsClient))
    .runWith(Sink.foreach(res -> System.out.println(res.getMessageId())), materializer);

Source.single(new PublishRequest().withMessage("message"))
    .via(SnsPublisher.createPublishFlow(topicArn, snsClient))
    .runWith(Sink.foreach(res -> System.out.println(res.getMessageId())), materializer);

Source.single(new PublishRequest().withMessage("message").withTopicArn(topicArn))
    .via(SnsPublisher.createPublishFlow(snsClient))
    .runWith(Sink.foreach(res -> System.out.println(res.getMessageId())), materializer);

As you can see, this would publish the messages from the source to the specified AWS SNS topic. After a message has been successfully published, a PublishResult will be pushed downstream.

Using a Sink

Scala
Source
  .single("message")
  .runWith(SnsPublisher.sink(topicArn))

Source
  .single(new PublishRequest().withMessage("message"))
  .runWith(SnsPublisher.publishSink(topicArn))

Source
  .single(new PublishRequest().withMessage("message").withTopicArn(topicArn))
  .runWith(SnsPublisher.publishSink())
Java
Source.single("message")
    .runWith(SnsPublisher.createSink(topicArn, snsClient), materializer);

Source.single(new PublishRequest().withMessage("message"))
    .runWith(SnsPublisher.createPublishSink(topicArn, snsClient), materializer);

Source.single(new PublishRequest().withMessage("message").withTopicArn(topicArn))
    .runWith(SnsPublisher.createPublishSink(snsClient), materializer);

As you can see, this would publish the messages from the source to the specified AWS SNS topic.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.