IronMQ

The IronMQ connector provides an Akka stream source and sink to connect to the IronMQ queue.

IronMQ is a simple point-to-point queue, but it is possible to implement a fan-out semantic by configure the queue as push queue and set other queue as subscribers. More information about that could be found on IronMQ documentation

Project Info: Alpakka IronMQ
Artifact
com.lightbend.akka
akka-stream-alpakka-ironmq
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12
JPMS module nameakka.stream.alpakka.ironmq
License
Readiness level
Since 0.8, 2017-05-05
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
val AkkaHttpVersion = "10.7.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-ironmq" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <akka.http.version>10.7.0</akka.http.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-ironmq_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-http_${scala.binary.version}</artifactId>
    <version>${akka.http.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  AkkaHttpVersion: "10.7.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-ironmq_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
  implementation "com.typesafe.akka:akka-http_${versions.ScalaBinary}:${versions.AkkaHttpVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-http_2.1310.7.0
com.typesafe.akkaakka-stream_2.132.10.0
de.heikoseebergerakka-http-circe_2.131.39.2
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
    com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
        com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
            com.typesafe    config    1.4.3    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.slf4j    slf4j-api    2.0.16
    org.scala-lang    scala-library    2.13.12    Apache-2.0
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
de.heikoseeberger    akka-http-circe_2.13    1.39.2    Apache-2.0
    com.typesafe.akka    akka-http_2.13    10.7.0    BUSL-1.1
        com.typesafe.akka    akka-http-core_2.13    10.7.0    BUSL-1.1
            com.typesafe.akka    akka-parsing_2.13    10.7.0    BUSL-1.1
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        com.typesafe.akka    akka-pki_2.13    2.10.0    BUSL-1.1
            com.hierynomus    asn-one    0.6.0    The Apache License, Version 2.0
            com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
                com.typesafe    config    1.4.3    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.slf4j    slf4j-api    2.0.16
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    io.circe    circe-core_2.13    0.14.1    Apache 2.0
        io.circe    circe-numbers_2.13    0.14.1    Apache 2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.typelevel    cats-core_2.13    2.6.1    MIT
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.typelevel    cats-kernel_2.13    2.6.1    MIT
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.typelevel    simulacrum-scalafix-annotations_2.13    0.5.4    Apache 2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
    io.circe    circe-parser_2.13    0.14.1    Apache 2.0
        io.circe    circe-core_2.13    0.14.1    Apache 2.0
            io.circe    circe-numbers_2.13    0.14.1    Apache 2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.typelevel    cats-core_2.13    2.6.1    MIT
                org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.typelevel    cats-kernel_2.13    2.6.1    MIT
                    org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.typelevel    simulacrum-scalafix-annotations_2.13    0.5.4    Apache 2.0
                    org.scala-lang    scala-library    2.13.12    Apache-2.0
        io.circe    circe-jawn_2.13    0.14.1    Apache 2.0
            io.circe    circe-core_2.13    0.14.1    Apache 2.0
                io.circe    circe-numbers_2.13    0.14.1    Apache 2.0
                    org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.scala-lang    scala-library    2.13.12    Apache-2.0
                org.typelevel    cats-core_2.13    2.6.1    MIT
                    org.scala-lang    scala-library    2.13.12    Apache-2.0
                    org.typelevel    cats-kernel_2.13    2.6.1    MIT
                        org.scala-lang    scala-library    2.13.12    Apache-2.0
                    org.typelevel    simulacrum-scalafix-annotations_2.13    0.5.4    Apache 2.0
                        org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.scala-lang    scala-library    2.13.12    Apache-2.0
            org.typelevel    jawn-parser_2.13    1.1.2    MIT
                org.scala-lang    scala-library    2.13.12    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.scala-lang    scala-library    2.13.12    Apache-2.0

Consumer

IronMQ can be used either in cloud or on-premise. Either way you need a authentication token and a project ID. These can be set in the application.conf file.

sourcealpakka.ironmq {

  // The IronMg endpoint. It may vary due to availability zone and region.
  endpoint = "https://mq-aws-eu-west-1-1.iron.io"

  credentials {

    // The IronMq project id
    // project-id =

    // The IronMq auth token
    // token =
  }

  consumer {

    // This is the max number of message to fetch from IronMq.
    buffer-max-size = 100

    // This is the threshold where fech other messages from IronMq
    buffer-min-size = 25

    // This is the time interval between each poll loop
    fetch-interval = 250 milliseconds

    // This is the amount of time the IronMq client will wait for a message to be available in the queue
    poll-timeout = 0

    // This is the amount of time a fetched message will be not available to other consumers
    reservation-timeout = 30 seconds

  }
}

The consumer is poll-based. It will poll every fetch-interval milliseconds, waiting for poll-timeout milliseconds to consume new messages and will push those downstream.

It supports both at-most-once and at-least-once semantics. In the first case the messages are deleted straight away after been fetched. In the latter case the messages piggy back a Committable object that should be used to commit the message. Committing the message will cause the message to be deleted from the queue.

At most once

The consumer source is instantiated using the IronMqConsumerIronMqConsumer.

Scala
sourceimport akka.stream.alpakka.ironmq.Message

val source: Source[Message, NotUsed] =
  IronMqConsumer.atMostOnceSource(queueName, ironMqSettings)

val receivedMessages: Future[immutable.Seq[Message]] = source
  .take(100)
  .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.ironmq.*;
import akka.stream.alpakka.ironmq.javadsl.*;

Source<Message, NotUsed> source = IronMqConsumer.atMostOnceSource(queueName, ironMqSettings);

CompletionStage<List<Message>> receivedMessages =
    source.take(5).runWith(Sink.seq(), materializer);

At least once

To ensure at-least-once semantics, CommittableMessages need to be committed after successful processing which will delete the message from IronMQ.

Scala
sourceimport akka.stream.alpakka.ironmq.scaladsl.CommittableMessage
import akka.stream.alpakka.ironmq.Message

val source: Source[CommittableMessage, NotUsed] =
  IronMqConsumer.atLeastOnceSource(queueName, ironMqSettings)

val businessLogic: Flow[CommittableMessage, CommittableMessage, NotUsed] =
  Flow[CommittableMessage] // do something useful with the received messages

val receivedMessages: Future[immutable.Seq[Message]] = source
  .take(100)
  .via(businessLogic)
  .mapAsync(1)(m => m.commit().map(_ => m.message))
  .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.ironmq.*;
import akka.stream.alpakka.ironmq.javadsl.*;

Source<CommittableMessage, NotUsed> source =
    IronMqConsumer.atLeastOnceSource(queueName, ironMqSettings);

Flow<CommittableMessage, CommittableMessage, NotUsed> businessLogic =
    Flow.of(CommittableMessage.class); // do something useful with the received messages

CompletionStage<List<Message>> receivedMessages =
    source
        .take(5)
        .via(businessLogic)
        .mapAsync(1, m -> m.commit().thenApply(d -> m.message()))
        .runWith(Sink.seq(), materializer);

Producer

The producer is very trivial at this time, it does not provide any batching mechanism, but sends messages to IronMq as soon as they arrive to the stage.

The producer is instantiated using the IronMqProducerIronMqProducer. It provides methods to obtain either a Flow[PushMessage, Messages.Id, NotUsed]Flow<PushMessage, Messages.Id, NotUsed> or a Sink[PushMessage, NotUsed]Sink<PushMessage, NotUsed>.

Flow

The PushMessage allows to specify the delay per individual message. The message expiration is set a queue level.

When using the Flow the returned Messages.IdsString contains the ID of the pushed message, that can be used to manipulate the message. For each PushMessage from the upstream you will have exactly one Message.IdString in downstream in the same order.

Scala
sourceimport akka.stream.alpakka.ironmq.{Message, PushMessage}

val messages: immutable.Seq[String] = (1 to messageCount).map(i => s"test-$i")
val producedIds: Future[immutable.Seq[Message.Id]] = Source(messages)
  .map(PushMessage(_))
  .via(IronMqProducer.flow(queueName, ironMqSettings))
  .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.ironmq.*;
import akka.stream.alpakka.ironmq.javadsl.*;

CompletionStage<List<String>> producedIds =
    Source.from(messages)
        .map(PushMessage::create)
        .via(IronMqProducer.flow(queueName, ironMqSettings))
        .runWith(Sink.seq(), materializer);

The producer also provides a committable aware Flow/Sink as Flow[(PushMessage, Committable), Message.Id, NotUsed]Flow<CommittablePushMessage<Committable>, String, NotUsed>. It can be used to consume a Flow from an IronMQ consumer or any other source that provides a commit mechanism.

Scala
sourceimport akka.stream.alpakka.ironmq.{Message, PushMessage}
import akka.stream.alpakka.ironmq.scaladsl.Committable

val pushAndCommit: Flow[(PushMessage, Committable), Message.Id, NotUsed] =
  IronMqProducer.atLeastOnceFlow(targetQueue, ironMqSettings)

val producedIds: Future[immutable.Seq[Message.Id]] = IronMqConsumer
  .atLeastOnceSource(sourceQueue, ironMqSettings)
  .take(messages.size)
  .map { committableMessage =>
    (PushMessage(committableMessage.message.body), committableMessage)
  }
  .via(pushAndCommit)
  .runWith(Sink.seq)
Java
sourceimport akka.stream.alpakka.ironmq.*;
import akka.stream.alpakka.ironmq.javadsl.*;

Flow<CommittablePushMessage<CommittableMessage>, String, NotUsed> pushAndCommit =
    IronMqProducer.atLeastOnceFlow(targetQueue, ironMqSettings);

CompletionStage<List<String>> producedIds =
    IronMqConsumer.atLeastOnceSource(sourceQueue, ironMqSettings)
        .take(messages.size())
        .map(CommittablePushMessage::create)
        .via(pushAndCommit)
        .runWith(Sink.seq(), materializer);

Sink

Scala
sourceimport akka.stream.alpakka.ironmq.{Message, PushMessage}

val messages: immutable.Seq[String] = (1 to messageCount).map(i => s"test-$i")
val producedIds: Future[Done] = Source(messages)
  .map(PushMessage(_))
  .runWith(IronMqProducer.sink(queueName, ironMqSettings))
Java
sourceimport akka.stream.alpakka.ironmq.*;
import akka.stream.alpakka.ironmq.javadsl.*;

CompletionStage<Done> producedIds =
    Source.from(messages)
        .map(PushMessage::create)
        .runWith(IronMqProducer.sink(queueName, ironMqSettings), materializer);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.