IronMQ
The IronMQ connector provides an Akka stream source and sink to connect to the IronMQ queue.
IronMQ is a simple point-to-point queue, but it is possible to implement a fan-out semantic by configure the queue as push queue and set other queue as subscribers. More information about that could be found on IronMQ documentation
Project Info: Alpakka IronMQ | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-ironmq
9.0.0
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12 |
JPMS module name | akka.stream.alpakka.ironmq |
License | |
Readiness level |
Since 0.8, 2017-05-05
|
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
- sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
- Maven
<project> ... <repositories> <repository> <id>akka-repository</id> <name>Akka library repository</name> <url>https://repo.akka.io/maven</url> </repository> </repositories> </project>
- Gradle
repositories { mavenCentral() maven { url "https://repo.akka.io/maven" } }
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" val AkkaHttpVersion = "10.7.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-ironmq" % "9.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion )
- Maven
<properties> <akka.version>2.10.0</akka.version> <akka.http.version>10.7.0</akka.http.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-ironmq_${scala.binary.version}</artifactId> <version>9.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-http_${scala.binary.version}</artifactId> <version>${akka.http.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.10.0", AkkaHttpVersion: "10.7.0", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-ironmq_${versions.ScalaBinary}:9.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-http_2.13 10.7.0 com.typesafe.akka akka-stream_2.13 2.10.0 de.heikoseeberger akka-http-circe_2.13 1.39.2 org.scala-lang scala-library 2.13.12 - Dependency tree
com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-stream_2.13 2.10.0 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.10.0 BUSL-1.1 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.12 Apache-2.0 de.heikoseeberger akka-http-circe_2.13 1.39.2 Apache-2.0 com.typesafe.akka akka-http_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-http-core_2.13 10.7.0 BUSL-1.1 com.typesafe.akka akka-parsing_2.13 10.7.0 BUSL-1.1 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 com.typesafe.akka akka-pki_2.13 2.10.0 BUSL-1.1 com.hierynomus asn-one 0.6.0 The Apache License, Version 2.0 com.typesafe.akka akka-actor_2.13 2.10.0 BUSL-1.1 com.typesafe config 1.4.3 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.slf4j slf4j-api 2.0.16 org.scala-lang scala-library 2.13.12 Apache-2.0 io.circe circe-core_2.13 0.14.1 Apache 2.0 io.circe circe-numbers_2.13 0.14.1 Apache 2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel cats-core_2.13 2.6.1 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel cats-kernel_2.13 2.6.1 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel simulacrum-scalafix-annotations_2.13 0.5.4 Apache 2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 io.circe circe-parser_2.13 0.14.1 Apache 2.0 io.circe circe-core_2.13 0.14.1 Apache 2.0 io.circe circe-numbers_2.13 0.14.1 Apache 2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel cats-core_2.13 2.6.1 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel cats-kernel_2.13 2.6.1 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel simulacrum-scalafix-annotations_2.13 0.5.4 Apache 2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 io.circe circe-jawn_2.13 0.14.1 Apache 2.0 io.circe circe-core_2.13 0.14.1 Apache 2.0 io.circe circe-numbers_2.13 0.14.1 Apache 2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel cats-core_2.13 2.6.1 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel cats-kernel_2.13 2.6.1 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel simulacrum-scalafix-annotations_2.13 0.5.4 Apache 2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.typelevel jawn-parser_2.13 1.1.2 MIT org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0 org.scala-lang scala-library 2.13.12 Apache-2.0
Consumer
IronMQ can be used either in cloud or on-premise. Either way you need a authentication token and a project ID. These can be set in the application.conf
file.
sourcealpakka.ironmq {
// The IronMg endpoint. It may vary due to availability zone and region.
endpoint = "https://mq-aws-eu-west-1-1.iron.io"
credentials {
// The IronMq project id
// project-id =
// The IronMq auth token
// token =
}
consumer {
// This is the max number of message to fetch from IronMq.
buffer-max-size = 100
// This is the threshold where fech other messages from IronMq
buffer-min-size = 25
// This is the time interval between each poll loop
fetch-interval = 250 milliseconds
// This is the amount of time the IronMq client will wait for a message to be available in the queue
poll-timeout = 0
// This is the amount of time a fetched message will be not available to other consumers
reservation-timeout = 30 seconds
}
}
The consumer is poll-based. It will poll every fetch-interval
milliseconds, waiting for poll-timeout
milliseconds to consume new messages and will push those downstream.
It supports both at-most-once and at-least-once semantics. In the first case the messages are deleted straight away after been fetched. In the latter case the messages piggy back a Committable
object that should be used to commit the message. Committing the message will cause the message to be deleted from the queue.
At most once
The consumer source is instantiated using the IronMqConsumer
IronMqConsumer
.
- Scala
-
source
import akka.stream.alpakka.ironmq.Message val source: Source[Message, NotUsed] = IronMqConsumer.atMostOnceSource(queueName, ironMqSettings) val receivedMessages: Future[immutable.Seq[Message]] = source .take(100) .runWith(Sink.seq)
- Java
-
source
import akka.stream.alpakka.ironmq.*; import akka.stream.alpakka.ironmq.javadsl.*; Source<Message, NotUsed> source = IronMqConsumer.atMostOnceSource(queueName, ironMqSettings); CompletionStage<List<Message>> receivedMessages = source.take(5).runWith(Sink.seq(), materializer);
At least once
To ensure at-least-once semantics, CommittableMessage
s need to be committed after successful processing which will delete the message from IronMQ.
- Scala
-
source
import akka.stream.alpakka.ironmq.scaladsl.CommittableMessage import akka.stream.alpakka.ironmq.Message val source: Source[CommittableMessage, NotUsed] = IronMqConsumer.atLeastOnceSource(queueName, ironMqSettings) val businessLogic: Flow[CommittableMessage, CommittableMessage, NotUsed] = Flow[CommittableMessage] // do something useful with the received messages val receivedMessages: Future[immutable.Seq[Message]] = source .take(100) .via(businessLogic) .mapAsync(1)(m => m.commit().map(_ => m.message)) .runWith(Sink.seq)
- Java
-
source
import akka.stream.alpakka.ironmq.*; import akka.stream.alpakka.ironmq.javadsl.*; Source<CommittableMessage, NotUsed> source = IronMqConsumer.atLeastOnceSource(queueName, ironMqSettings); Flow<CommittableMessage, CommittableMessage, NotUsed> businessLogic = Flow.of(CommittableMessage.class); // do something useful with the received messages CompletionStage<List<Message>> receivedMessages = source .take(5) .via(businessLogic) .mapAsync(1, m -> m.commit().thenApply(d -> m.message())) .runWith(Sink.seq(), materializer);
Producer
The producer is very trivial at this time, it does not provide any batching mechanism, but sends messages to IronMq as soon as they arrive to the stage.
The producer is instantiated using the IronMqProducer
IronMqProducer
. It provides methods to obtain either a Flow[PushMessage, Messages.Id, NotUsed]
Flow<PushMessage, Messages.Id, NotUsed>
or a Sink[PushMessage, NotUsed]
Sink<PushMessage, NotUsed>
.
Flow
The PushMessage
allows to specify the delay per individual message. The message expiration is set a queue level.
When using the Flow
the returned Messages.Ids
String
contains the ID of the pushed message, that can be used to manipulate the message. For each PushMessage
from the upstream you will have exactly one Message.Id
String
in downstream in the same order.
- Scala
-
source
import akka.stream.alpakka.ironmq.{Message, PushMessage} val messages: immutable.Seq[String] = (1 to messageCount).map(i => s"test-$i") val producedIds: Future[immutable.Seq[Message.Id]] = Source(messages) .map(PushMessage(_)) .via(IronMqProducer.flow(queueName, ironMqSettings)) .runWith(Sink.seq)
- Java
-
source
import akka.stream.alpakka.ironmq.*; import akka.stream.alpakka.ironmq.javadsl.*; CompletionStage<List<String>> producedIds = Source.from(messages) .map(PushMessage::create) .via(IronMqProducer.flow(queueName, ironMqSettings)) .runWith(Sink.seq(), materializer);
The producer also provides a committable aware Flow/Sink as Flow[(PushMessage, Committable), Message.Id, NotUsed]
Flow<CommittablePushMessage<Committable>, String, NotUsed>
. It can be used to consume a Flow from an IronMQ consumer or any other source that provides a commit mechanism.
- Scala
-
source
import akka.stream.alpakka.ironmq.{Message, PushMessage} import akka.stream.alpakka.ironmq.scaladsl.Committable val pushAndCommit: Flow[(PushMessage, Committable), Message.Id, NotUsed] = IronMqProducer.atLeastOnceFlow(targetQueue, ironMqSettings) val producedIds: Future[immutable.Seq[Message.Id]] = IronMqConsumer .atLeastOnceSource(sourceQueue, ironMqSettings) .take(messages.size) .map { committableMessage => (PushMessage(committableMessage.message.body), committableMessage) } .via(pushAndCommit) .runWith(Sink.seq)
- Java
-
source
import akka.stream.alpakka.ironmq.*; import akka.stream.alpakka.ironmq.javadsl.*; Flow<CommittablePushMessage<CommittableMessage>, String, NotUsed> pushAndCommit = IronMqProducer.atLeastOnceFlow(targetQueue, ironMqSettings); CompletionStage<List<String>> producedIds = IronMqConsumer.atLeastOnceSource(sourceQueue, ironMqSettings) .take(messages.size()) .map(CommittablePushMessage::create) .via(pushAndCommit) .runWith(Sink.seq(), materializer);
Sink
- Scala
-
source
import akka.stream.alpakka.ironmq.{Message, PushMessage} val messages: immutable.Seq[String] = (1 to messageCount).map(i => s"test-$i") val producedIds: Future[Done] = Source(messages) .map(PushMessage(_)) .runWith(IronMqProducer.sink(queueName, ironMqSettings))
- Java
-
source
import akka.stream.alpakka.ironmq.*; import akka.stream.alpakka.ironmq.javadsl.*; CompletionStage<Done> producedIds = Source.from(messages) .map(PushMessage::create) .runWith(IronMqProducer.sink(queueName, ironMqSettings), materializer);