AMQP

The AMQP connector provides Akka Stream sources and sinks to connect to AMQP 0.9.1 servers (RabbitMQ, OpenAMQ, etc.).

AMQP 1.0 is currently not supported (Qpid, ActiveMQ, Solace, etc.).

Project Info: Alpakka AMQP
Artifact
com.lightbend.akka
akka-stream-alpakka-amqp
1.0.2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12, 2.13.0-M5
JPMS module nameakka.stream.alpakka.amqp
License
Readiness level
Since 0.1, 2016-11-11
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-amqp" % "1.0.2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-amqp_2.12</artifactId>
  <version>1.0.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-amqp_2.12', version: '1.0.2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.rabbitmqamqp-client5.3.0ASL 2.0
com.typesafe.akkaakka-stream_2.122.5.22Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.rabbitmq    amqp-client    5.3.0    ASL 2.0
    org.slf4j    slf4j-api    1.7.25    MIT License
com.typesafe.akka    akka-stream_2.12    2.5.22    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.22    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.22    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Connecting to server

All the AMQP connectors are configured using a AmqpConnectionProvider and a list of Declaration

There are several types of AmqpConnectionProvider:

  • AmqpLocalConnectionProvider which connects to the default localhost. It creates a new connection for each stage.
  • AmqpUriConnectionProvider which connects to the given AMQP URI. It creates a new connection for each stage.
  • AmqpDetailsConnectionProvider which supports more fine-grained configuration. It creates a new connection for each stage.
  • AmqpConnectionFactoryConnectionProvider which takes a raw ConnectionFactory. It creates a new connection for each stage.
  • AmqpCachedConnectionProvider which receive any other provider as parameter and caches the connection it provides to be used in all stages. By default it closes the connection whenever the last stage using the provider stops. Optionally, it takes automaticRelease boolean parameter so the connection is not automatically release and the user have to release it explicitly.

Sending messages

First define a queue name and the declaration of the queue that the messages will be sent to.

Scala
val queueName = "amqp-conn-it-spec-simple-queue-" + System.currentTimeMillis()
val queueDeclaration = QueueDeclaration(queueName)
Java
final String queueName = "amqp-conn-it-test-simple-queue-" + System.currentTimeMillis();
final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName);

Here we used QueueDeclaration configuration class to create a queue declaration.

Create a sink, that accepts and forwards ByteStrings to the AMQP server.

AmqpSinkAmqpSink is a collection of factory methods that facilitates creation of sinks. Here we created a simple sink, which means that we are able to pass ByteStrings to the sink instead of wrapping data into WriteMessages.

Last step is to materialize and run the sink we have created.

Scala
val amqpSink: Sink[ByteString, Future[Done]] =
  AmqpSink.simple(
    AmqpWriteSettings(connectionProvider)
      .withRoutingKey(queueName)
      .withDeclaration(queueDeclaration)
  )

val input = Vector("one", "two", "three", "four", "five")
val writing: Future[Done] =
  Source(input)
    .map(s => ByteString(s))
    .runWith(amqpSink)
Java
final Sink<ByteString, CompletionStage<Done>> amqpSink =
    AmqpSink.createSimple(
        AmqpWriteSettings.create(connectionProvider)
            .withRoutingKey(queueName)
            .withDeclaration(queueDeclaration));

final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
CompletionStage<Done> writing =
    Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);

Receiving messages

Create a source using the same queue declaration as before.

The bufferSize parameter controls the maximum number of messages to prefetch from the AMQP server.

Run the source and take the same amount of messages as we previously sent to it.

Scala
val amqpSource: Source[ReadResult, NotUsed] =
  AmqpSource.atMostOnceSource(
    NamedQueueSourceSettings(connectionProvider, queueName)
      .withDeclaration(queueDeclaration)
      .withAckRequired(false),
    bufferSize = 10
  )

val result: Future[immutable.Seq[ReadResult]] =
  amqpSource
    .take(input.size)
    .runWith(Sink.seq)
Java
final Integer bufferSize = 10;
final Source<ReadResult, NotUsed> amqpSource =
    AmqpSource.atMostOnceSource(
        NamedQueueSourceSettings.create(connectionProvider, queueName)
            .withDeclaration(queueDeclaration)
            .withAckRequired(false),
        bufferSize);

final CompletionStage<List<ReadResult>> result =
    amqpSource.take(input.size()).runWith(Sink.seq(), materializer);

This is how you send and receive message from AMQP server using this connector.

Using Pub/Sub

Instead of sending messages directly to queues, it is possible to send messages to an exchange and then provide instructions to the AMQP server what to do with incoming messages. We are going to use the fanout type of the exchange, which enables message broadcasting to multiple consumers. We are going to do that by using an exchange declaration for the sink and all of the sources.

Scala
val exchangeName = "amqp-conn-it-spec-pub-sub-" + System.currentTimeMillis()
val exchangeDeclaration = ExchangeDeclaration(exchangeName, "fanout")
Java
final String exchangeName = "amqp-conn-it-test-pub-sub-" + System.currentTimeMillis();
final ExchangeDeclaration exchangeDeclaration =
    ExchangeDeclaration.create(exchangeName, "fanout");

The sink for the exchange is created in a very similar way.

Scala
val amqpSink = AmqpSink.simple(
  AmqpWriteSettings(connectionProvider)
    .withExchange(exchangeName)
    .withDeclaration(exchangeDeclaration)
)
Java
final Sink<ByteString, CompletionStage<Done>> amqpSink =
    AmqpSink.createSimple(
        AmqpWriteSettings.create(connectionProvider)
            .withExchange(exchangeName)
            .withDeclaration(exchangeDeclaration));

For the source, we are going to create multiple sources and merge them using Akka Streams operators.

Scala
val fanoutSize = 4

val mergedSources = (0 until fanoutSize).foldLeft(Source.empty[(Int, String)]) {
  case (source, fanoutBranch) =>
    source.merge(
      AmqpSource
        .atMostOnceSource(
          TemporaryQueueSourceSettings(
            connectionProvider,
            exchangeName
          ).withDeclaration(exchangeDeclaration),
          bufferSize = 1
        )
        .map(msg => (fanoutBranch, msg.bytes.utf8String))
    )
}
Java
final int fanoutSize = 4;
final int bufferSize = 1;

Source<Pair<Integer, String>, NotUsed> mergedSources = Source.empty();
for (int i = 0; i < fanoutSize; i++) {
  final int fanoutBranch = i;
  mergedSources =
      mergedSources.merge(
          AmqpSource.atMostOnceSource(
                  TemporaryQueueSourceSettings.create(connectionProvider, exchangeName)
                      .withDeclaration(exchangeDeclaration),
                  bufferSize)
              .map(msg -> Pair.create(fanoutBranch, msg.bytes().utf8String())));
}

We merge all sources into one and add the index of the source to all incoming messages, so we can distinguish which source the incoming message came from.

Such sink and source can be started the same way as in the previous example.

Using rabbitmq as an RPC mechanism

If you have remote workers that you want to incorporate into a stream, you can do it using rabbit RPC workflow RabbitMQ RPC

Scala
val amqpRpcFlow = AmqpRpcFlow.simple(
  AmqpWriteSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration)
)

val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString]) = Source(input)
  .map(s => ByteString(s))
  .viaMat(amqpRpcFlow)(Keep.right)
  .toMat(TestSink.probe)(Keep.both)
  .run
Java
final Flow<ByteString, ByteString, CompletionStage<String>> ampqRpcFlow =
    AmqpRpcFlow.createSimple(
        AmqpWriteSettings.create(connectionProvider)
            .withRoutingKey(queueName)
            .withDeclaration(queueDeclaration),
        1);

Pair<CompletionStage<String>, TestSubscriber.Probe<ByteString>> result =
    Source.from(input)
        .map(ByteString::fromString)
        .viaMat(ampqRpcFlow, Keep.right())
        .toMat(TestSink.probe(system), Keep.both())
        .run(materializer);

Acknowledging messages downstream

Committable sources return CommittableReadResultCommittableReadResult which wraps the ReadResult and exposes the methods ack and nack.

Use ack to acknowledge the message back to RabbitMQ. ack takes an optional boolean parameter multiple indicating whether you are acknowledging the individual message or all the messages up to it.

Use nack to reject a message. Apart from the multiple argument, nack takes another optional boolean parameter indicating whether the item should be requeued or not.

Scala
val amqpSource = AmqpSource.committableSource(
  NamedQueueSourceSettings(connectionProvider, queueName)
    .withDeclaration(queueDeclaration),
  bufferSize = 10
)

val result: Future[immutable.Seq[ReadResult]] = amqpSource
  .mapAsync(1)(businessLogic)
  .mapAsync(1)(cm => cm.ack().map(_ => cm.message))
  .take(input.size)
  .runWith(Sink.seq)

val nackedResults: Future[immutable.Seq[ReadResult]] = amqpSource
  .mapAsync(1)(businessLogic)
  .take(input.size)
  .mapAsync(1)(cm => cm.nack(multiple = false, requeue = true).map(_ => cm.message))
  .runWith(Sink.seq)
Java
final Integer bufferSize = 10;
final Source<CommittableReadResult, NotUsed> amqpSource =
    AmqpSource.committableSource(
        NamedQueueSourceSettings.create(connectionProvider, queueName)
            .withDeclaration(queueDeclaration),
        bufferSize);

final CompletionStage<List<ReadResult>> result =
    amqpSource
        .mapAsync(1, this::businessLogic)
        .mapAsync(1, cm -> cm.ack(/* multiple */ false).thenApply(unused -> cm.message()))
        .take(input.size())
        .runWith(Sink.seq(), materializer);

final CompletionStage<List<ReadResult>> nackedResults =
    amqpSource
        .take(input.size())
        .mapAsync(1, this::businessLogic)
        .mapAsync(
            1,
            cm ->
                cm.nack(/* multiple */ false, /* requeue */ true)
                    .thenApply(unused -> cm.message()))
        .runWith(Sink.seq(), materializer);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.