AMQP
The AMQP connector provides Akka Stream sources and sinks to connect to AMQP 0.9.1 servers (RabbitMQ, OpenAMQ, etc.).
AMQP 1.0 is currently not supported (Qpid, ActiveMQ, Solace, etc.).
Project Info: Alpakka AMQP | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-amqp
2.0.2
|
JDK versions | Adopt OpenJDK 8 Adopt OpenJDK 11 |
Scala versions | 2.12.11, 2.11.12, 2.13.3 |
JPMS module name | akka.stream.alpakka.amqp |
License | |
Readiness level |
Since 0.1, 2016-11-11
|
Home page | https://doc.akka.io/docs/alpakka/current |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
val AkkaVersion = "2.5.31" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-amqp" % "2.0.2", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.5.31</akka.version> <scala.binary.version>2.12</scala.binary.version> </properties> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-amqp_${scala.binary.version}</artifactId> <version>2.0.2</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency>
- Gradle
versions += [ AkkaVersion: "2.5.31", ScalaBinary: "2.12" ] dependencies { compile group: 'com.lightbend.akka', name: "akka-stream-alpakka-amqp_${versions.ScalaBinary}", version: '2.0.2', compile group: 'com.typesafe.akka', name: "akka-stream_${versions.ScalaBinary}", version: versions.AkkaVersion }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.rabbitmq amqp-client 5.3.0 com.typesafe.akka akka-stream_2.12 2.5.31 org.scala-lang scala-library 2.12.11 - Dependency tree
com.rabbitmq amqp-client 5.3.0 org.slf4j slf4j-api 1.7.25 com.typesafe.akka akka-stream_2.12 2.5.31 com.typesafe.akka akka-actor_2.12 2.5.31 com.typesafe config 1.3.3 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 com.typesafe.akka akka-protobuf_2.12 2.5.31 org.scala-lang scala-library 2.12.11 com.typesafe ssl-config-core_2.12 0.3.8 com.typesafe config 1.3.3 org.scala-lang.modules scala-parser-combinators_2.12 1.1.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11 org.reactivestreams reactive-streams 1.0.2 org.scala-lang scala-library 2.12.11 org.scala-lang scala-library 2.12.11
Connecting to server
All the AMQP connectors are configured using a AmqpConnectionProvider
AmqpConnectionProvider
and a list of Declaration
Declaration
There are several types of AmqpConnectionProvider
AmqpConnectionProvider
:
AmqpLocalConnectionProvider
AmqpLocalConnectionProvider
which connects to the default localhost. It creates a new connection for each stage.AmqpUriConnectionProvider
AmqpUriConnectionProvider
which connects to the given AMQP URI. It creates a new connection for each stage.AmqpDetailsConnectionProvider
AmqpDetailsConnectionProvider
which supports more fine-grained configuration. It creates a new connection for each stage.AmqpConnectionFactoryConnectionProvider
AmqpConnectionFactoryConnectionProvider
which takes a raw ConnectionFactory. It creates a new connection for each stage.AmqpCachedConnectionProvider
AmqpCachedConnectionProvider
which receive any other provider as parameter and caches the connection it provides to be used in all stages. By default it closes the connection whenever the last stage using the provider stops. Optionally, it takesautomaticRelease
boolean parameter so the connection is not automatically release and the user have to release it explicitly.
Sending messages
First define a queue name and the declaration of the queue that the messages will be sent to.
- Scala
-
val queueName = "amqp-conn-it-spec-simple-queue-" + System.currentTimeMillis() val queueDeclaration = QueueDeclaration(queueName)
- Java
-
final String queueName = "amqp-conn-it-test-simple-queue-" + System.currentTimeMillis(); final QueueDeclaration queueDeclaration = QueueDeclaration.create(queueName);
Here we used QueueDeclaration
QueueDeclaration
configuration class to create a queue declaration.
With flow
Similarly as with Sink, the first step is to create Flow which accepts WriteMessage
WriteMessage
s and forwards it’s content to the AMQP server. Flow emits WriteResult
WriteResult
s informing about publication result (see below for summary of delivery guarantees for different Flow variants).
AmqpFlow
AmqpFlow
is a collection of factory methods that facilitates creation of flows. Here we created a simple sink, which means that we are able to pass ByteString
s to the sink instead of wrapping data into WriteMessage
WriteMessage
s.
Last step is to materialize and run the flow we have created.
- Scala
-
val settings = AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) .withBufferSize(10) .withConfirmationTimeout(200.millis) val amqpFlow: Flow[WriteMessage, WriteResult, Future[Done]] = AmqpFlow.withConfirm(settings) val input = Vector("one", "two", "three", "four", "five") val result: Future[Seq[WriteResult]] = Source(input) .map(message => WriteMessage(ByteString(message))) .via(amqpFlow) .runWith(Sink.seq)
- Java
-
final AmqpWriteSettings settings = AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) .withBufferSize(10) .withConfirmationTimeout(Duration.ofMillis(200)); final Flow<WriteMessage, WriteResult, CompletionStage<Done>> amqpFlow = AmqpFlow.createWithConfirm(settings); final List<String> input = Arrays.asList("one", "two", "three", "four", "five"); final List<WriteResult> result = Source.from(input) .map(message -> WriteMessage.create(ByteString.fromString(message))) .via(amqpFlow) .runWith(Sink.seq(), materializer) .toCompletableFuture() .get();
Various variants of AMQP flow offer different delivery and ordering guarantees:
AMQP flow factory | Description |
---|---|
AmqpFlow.apply | The most basic type of flow. Does not impose delivery guarantees, messages are published in a fire-and-forget manner. Emitted results have confirmed always set to true. |
AmqpFlow.withConfirm | Variant that uses asynchronous confirmations. Maximum number of messages simultaneously waiting for confirmation before signaling backpressure is configured with a bufferSize parameter. Emitted results preserve the order of messages pulled from upstream - due to that restriction this flow is expected to be slightly less effective than it’s unordered counterpart. |
AmqpFlow.withConfirmUnordered | The same as AmqpFlow.withConfirm with the exception of ordering guarantee - results are emitted downstream as soon as confirmation is received, meaning that there is no ordering guarantee of any sort. |
For FlowWithContext
FlowWithContext
counterparts of above flows see AmqpFlowWithContext
AmqpFlowWithContext
.
AmqpFlow.withConfirm
and AmqpFlow.withConfirmUnordered
are implemented using RabbitMQ’s extension to AMQP protocol (Publisher Confirms), therefore they are not intended to work with another AMQP brokers.
With sink
Create a sink, that accepts and forwards ByteString
ByteString
s to the AMQP server.
AmqpSink
AmqpSink
is a collection of factory methods that facilitates creation of sinks. Here we created a simple sink, which means that we are able to pass ByteString
s to the sink instead of wrapping data into WriteMessage
WriteMessage
s.
Last step is to materialize and run the sink we have created.
- Scala
-
val amqpSink: Sink[ByteString, Future[Done]] = AmqpSink.simple( AmqpWriteSettings(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration) ) val input = Vector("one", "two", "three", "four", "five") val writing: Future[Done] = Source(input) .map(s => ByteString(s)) .runWith(amqpSink)
- Java
-
final Sink<ByteString, CompletionStage<Done>> amqpSink = AmqpSink.createSimple( AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration)); final List<String> input = Arrays.asList("one", "two", "three", "four", "five"); CompletionStage<Done> writing = Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);
Receiving messages
Create a source using the same queue declaration as before.
The bufferSize
parameter controls the maximum number of messages to prefetch from the AMQP server.
Run the source and take the same amount of messages as we previously sent to it.
- Scala
-
val amqpSource: Source[ReadResult, NotUsed] = AmqpSource.atMostOnceSource( NamedQueueSourceSettings(connectionProvider, queueName) .withDeclaration(queueDeclaration) .withAckRequired(false), bufferSize = 10 ) val result: Future[immutable.Seq[ReadResult]] = amqpSource .take(input.size) .runWith(Sink.seq)
- Java
-
final Integer bufferSize = 10; final Source<ReadResult, NotUsed> amqpSource = AmqpSource.atMostOnceSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration) .withAckRequired(false), bufferSize); final CompletionStage<List<ReadResult>> result = amqpSource.take(input.size()).runWith(Sink.seq(), materializer);
This is how you send and receive message from AMQP server using this connector.
Using Pub/Sub
Instead of sending messages directly to queues, it is possible to send messages to an exchange and then provide instructions to the AMQP server what to do with incoming messages. We are going to use the fanout type of the exchange, which enables message broadcasting to multiple consumers. We are going to do that by using an exchange declaration for the sink and all of the sources.
- Scala
-
val exchangeName = "amqp-conn-it-spec-pub-sub-" + System.currentTimeMillis() val exchangeDeclaration = ExchangeDeclaration(exchangeName, "fanout")
- Java
-
final String exchangeName = "amqp-conn-it-test-pub-sub-" + System.currentTimeMillis(); final ExchangeDeclaration exchangeDeclaration = ExchangeDeclaration.create(exchangeName, "fanout");
The sink for the exchange is created in a very similar way.
- Scala
-
val amqpSink = AmqpSink.simple( AmqpWriteSettings(connectionProvider) .withExchange(exchangeName) .withDeclaration(exchangeDeclaration) )
- Java
-
final Sink<ByteString, CompletionStage<Done>> amqpSink = AmqpSink.createSimple( AmqpWriteSettings.create(connectionProvider) .withExchange(exchangeName) .withDeclaration(exchangeDeclaration));
For the source, we are going to create multiple sources and merge them using Akka Streams operators.
- Scala
-
val fanoutSize = 4 val mergedSources = (0 until fanoutSize).foldLeft(Source.empty[(Int, String)]) { case (source, fanoutBranch) => source.merge( AmqpSource .atMostOnceSource( TemporaryQueueSourceSettings( connectionProvider, exchangeName ).withDeclaration(exchangeDeclaration), bufferSize = 1 ) .map(msg => (fanoutBranch, msg.bytes.utf8String)) ) }
- Java
-
final int fanoutSize = 4; final int bufferSize = 1; Source<Pair<Integer, String>, NotUsed> mergedSources = Source.empty(); for (int i = 0; i < fanoutSize; i++) { final int fanoutBranch = i; mergedSources = mergedSources.merge( AmqpSource.atMostOnceSource( TemporaryQueueSourceSettings.create(connectionProvider, exchangeName) .withDeclaration(exchangeDeclaration), bufferSize) .map(msg -> Pair.create(fanoutBranch, msg.bytes().utf8String()))); }
We merge all sources into one and add the index of the source to all incoming messages, so we can distinguish which source the incoming message came from.
Such sink and source can be started the same way as in the previous example.
Using rabbitmq as an RPC mechanism
If you have remote workers that you want to incorporate into a stream, you can do it using rabbit RPC workflow RabbitMQ RPC
- Scala
-
val amqpRpcFlow = AmqpRpcFlow.simple( AmqpWriteSettings(connectionProvider).withRoutingKey(queueName).withDeclaration(queueDeclaration) ) val (rpcQueueF: Future[String], probe: TestSubscriber.Probe[ByteString]) = Source(input) .map(s => ByteString(s)) .viaMat(amqpRpcFlow)(Keep.right) .toMat(TestSink.probe)(Keep.both) .run
- Java
-
final Flow<ByteString, ByteString, CompletionStage<String>> ampqRpcFlow = AmqpRpcFlow.createSimple( AmqpWriteSettings.create(connectionProvider) .withRoutingKey(queueName) .withDeclaration(queueDeclaration), 1); Pair<CompletionStage<String>, TestSubscriber.Probe<ByteString>> result = Source.from(input) .map(ByteString::fromString) .viaMat(ampqRpcFlow, Keep.right()) .toMat(TestSink.probe(system), Keep.both()) .run(materializer);
Acknowledging messages downstream
Committable sources return CommittableReadResult
CommittableReadResult
which wraps the ReadResult
ReadResult
and exposes the methods ack
and nack
.
Use ack
to acknowledge the message back to RabbitMQ. ack
takes an optional boolean parameter multiple
indicating whether you are acknowledging the individual message or all the messages up to it.
Use nack
to reject a message. Apart from the multiple
argument, nack
takes another optional boolean parameter indicating whether the item should be requeued or not.
- Scala
-
val amqpSource = AmqpSource.committableSource( NamedQueueSourceSettings(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize = 10 ) val result: Future[immutable.Seq[ReadResult]] = amqpSource .mapAsync(1)(businessLogic) .mapAsync(1)(cm => cm.ack().map(_ => cm.message)) .take(input.size) .runWith(Sink.seq) val nackedResults: Future[immutable.Seq[ReadResult]] = amqpSource .mapAsync(1)(businessLogic) .take(input.size) .mapAsync(1)(cm => cm.nack(multiple = false, requeue = true).map(_ => cm.message)) .runWith(Sink.seq)
- Java
-
final Integer bufferSize = 10; final Source<CommittableReadResult, NotUsed> amqpSource = AmqpSource.committableSource( NamedQueueSourceSettings.create(connectionProvider, queueName) .withDeclaration(queueDeclaration), bufferSize); final CompletionStage<List<ReadResult>> result = amqpSource .mapAsync(1, this::businessLogic) .mapAsync(1, cm -> cm.ack(/* multiple */ false).thenApply(unused -> cm.message())) .take(input.size()) .runWith(Sink.seq(), materializer); final CompletionStage<List<ReadResult>> nackedResults = amqpSource .take(input.size()) .mapAsync(1, this::businessLogic) .mapAsync( 1, cm -> cm.nack(/* multiple */ false, /* requeue */ true) .thenApply(unused -> cm.message())) .runWith(Sink.seq(), materializer);