Consumer

The Alpakka JMS connector offers consuming JMS messages from topics or queues:

  • Read javax.jms.Messages from an Akka Streams source
  • Allow for client acknowledgement to the JMS broker
  • Allow for JMS transactions
  • Read raw JVM types from an Akka Streams Source

The JMS message model supports several types of message bodies in (see javax.jms.Message), which may be created directly from the Akka Stream elements, or in wrappers to access more advanced features.

Receiving messages

JmsConsumerJmsConsumer offers factory methods to consume JMS messages in a number of ways.

This examples shows how to listen to a JMS queue and emit javax.jms.Message elements into the stream.

The materialized value JmsConsumerControl is used to shut down the consumer (it is a Killswitch) and offers the possibility to inspect the connectivity state of the consumer.

Scala
val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer(
  JmsConsumerSettings(system, connectionFactory).withQueue("numbers")
)

val (control, result): (JmsConsumerControl, Future[immutable.Seq[String]]) =
  jmsSource
    .take(msgsIn.size)
    .map {
      case t: javax.jms.TextMessage => t.getText
      case other => sys.error(s"unexpected message type ${other.getClass}")
    }
    .toMat(Sink.seq)(Keep.both)
    .run()

control.shutdown()
Java
ConnectionFactory connectionFactory = server.createConnectionFactory();

Source<javax.jms.Message, JmsConsumerControl> jmsSource =
    JmsConsumer.create(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

Pair<JmsConsumerControl, CompletionStage<List<String>>> controlAndResult =
    jmsSource
        .take(expectedMessages)
        .map(
            msg -> {
              if (msg instanceof TextMessage) {
                TextMessage t = (TextMessage) msg;
                return t.getText();
              } else
                throw new RuntimeException("unexpected message type " + msg.getClass());
            })
        .toMat(Sink.seq(), Keep.both())
        .run(materializer);

JmsConsumerControl control = controlAndResult.first();
control.shutdown();

Configure JMS consumers

To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory. The Alpakka tests and all examples use Active MQ.

Scala
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
Java
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

The created ConnectionFactory is then used for the creation of the different JMS sources.

The JmsConsumerSettings factories allow for passing the actor system to read from the default alpakka.jms.consumer section, or you may pass a Config instance which is resolved to a section of the same structure.

Scala
val consumerConfig: Config = system.settings.config.getConfig(JmsConsumerSettings.configPath)
// reiterating defaults from reference.conf
val settings = JmsConsumerSettings(consumerConfig, connectionFactory)
  .withQueue("target-queue")
  .withCredentials(Credentials("username", "password"))
  .withConnectionRetrySettings(retrySettings)
  .withSessionCount(1)
  .withBufferSize(100)
  .withAckTimeout(1.second)
Java
Config consumerConfig = config.getConfig(JmsConsumerSettings.configPath());
JmsConsumerSettings settings =
    JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url"))
        .withTopic("message-topic")
        .withCredential(Credentials.create("username", "password"))
        .withConnectionRetrySettings(retrySettings)
        .withSessionCount(10)
        .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge())
        .withSelector("Important = TRUE");

The Alpakka JMS consumer is configured via default settings in the HOCON config file section alpakka.jms.consumer in your application.conf, and settings may be tweaked in the code using the withXyz methods. On the second tab the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Description Default Value
connectionFactory Factory to use for creating JMS connections Must be set in code
destination Destination (queue or topic) to send JMS messages to Must be set in code
credentials JMS broker credentials Empty
connectionRetrySettings Retry characteristics if the connection failed to be established or is taking a long time. See Connection Retries
sessionCount Number of parallel sessions to use for receiving JMS messages. defaults to 1
bufferSize Maximum number of messages to prefetch before applying backpressure. 100
ackTimeout For use with JMS transactions, only: maximum time given to a message to be committed or rolled back. 1 second
selector JMS selector expression (see below) Empty
reference.conf
# Jms Consumer Settings
# sets default values
consumer {
  # Configure connection retrying by providing settings for ConnectionRetrySettings.
  connection-retry = ${alpakka.jms.connection-retry}
  # Credentials to connect to the JMS broker.
  # credentials {
  #   username = "some text"
  #   password = "some text"
  # }
  # "off" to not use any credentials.
  credentials = off
  # Number of parallel sessions to use for receiving JMS messages.
  session-count = 1
  # Buffer size for maximum number for messages read from JMS when there is no demand
  # (or acks are pending for acknowledged consumers).
  buffer-size = 100
  # JMS selector expression.
  # See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html
  # empty string for unset
  selector = "" # optional
  # Set an explicit acknowledge mode.
  # (Consumers have specific defaults.)
  # See eg. javax.jms.Session.AUTO_ACKNOWLEDGE
  # Allowed values: "off", "auto", "client", "duplicates-ok", "session", integer value
  acknowledge-mode = off
  # Timeout for acknowledge.
  # (Used by TX consumers.)
  ack-timeout = 1 second
  # For use with transactions, if true the stream fails if Alpakka rolls back the transaction
  # when `ack-timeout` is hit.
  fail-stream-on-ack-timeout = false
}

Broker specific destinations

To reach out to special features of the JMS broker, destinations can be created as CustomDestination which takes a factory method for creating destinations.

Scala
def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) =>
  val amqSession = session.asInstanceOf[ActiveMQSession]
  amqSession.createQueue(s"my-$destinationName")
}

val jmsSource: Source[javax.jms.Message, JmsConsumerControl] = JmsConsumer(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withDestination(CustomDestination("custom", createQueue("custom")))
)
Java
Function<javax.jms.Session, javax.jms.Destination> createQueue(String destinationName) {
  return (session) -> {
    ActiveMQSession amqSession = (ActiveMQSession) session;
    try {
      return amqSession.createQueue("my-" + destinationName);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  };
}

        Source<Message, JmsConsumerControl> jmsSource =
            JmsConsumer.create(
                JmsConsumerSettings.create(system, connectionFactory)
                    .withDestination(new CustomDestination("custom", createQueue("custom"))));

Using JMS client acknowledgement

Client acknowledgement ensures a message is successfully received by the consumer and notifies the JMS broker for every message. Due to the threading details in JMS brokers, this special source is required (see the explanation below).

Scala
val jmsSource: Source[AckEnvelope, JmsConsumerControl] = JmsConsumer.ackSource(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withSessionCount(5)
    .withQueue("numbers")
)

val result: Future[immutable.Seq[javax.jms.Message]] =
  jmsSource
    .take(msgsIn.size)
    .map { ackEnvelope =>
      ackEnvelope.acknowledge()
      ackEnvelope.message
    }
    .runWith(Sink.seq)
Java
ConnectionFactory connectionFactory = server.createConnectionFactory();

Source<akka.stream.alpakka.jms.AckEnvelope, JmsConsumerControl> jmsSource =
    JmsConsumer.ackSource(
        JmsConsumerSettings.create(system, connectionFactory)
            .withSessionCount(5)
            .withQueue("test"));

CompletionStage<List<javax.jms.Message>> result =
    jmsSource
        .take(msgsIn.size())
        .map(
            envelope -> {
              envelope.acknowledge();
              return envelope.message();
            })
        .runWith(Sink.seq(), materializer);

The sessionCount parameter controls the number of JMS sessions to run in parallel.

Notes:

  • Using multiple sessions increases throughput, especially if acknowledging message by message is desired.
  • Messages may arrive out of order if sessionCount is larger than 1.
  • Message-by-message acknowledgement can be achieved by setting bufferSize to 0, thus disabling buffering. The outstanding messages before backpressure will be the sessionCount.
  • The default AcknowledgeMode is ClientAcknowledge but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.
Warning

Using a regular JmsConsumer with AcknowledgeMode.ClientAcknowledge and using message.acknowledge() from the stream is not compliant with the JMS specification and can cause issues for some message brokers. message.acknowledge() in many cases acknowledges the session and not the message itself, contrary to what the API makes you believe.

Use this JmsConsumer.ackSource as shown above instead.

Using JMS transactions

JMS transactions may be used with this connector. Be aware that transactions are a heavy-weight tool and may not perform very good.

Scala
val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withSessionCount(5)
    .withAckTimeout(1.second)
    .withQueue("numbers")
)

val result: Future[immutable.Seq[javax.jms.Message]] =
  jmsSource
    .take(msgsIn.size)
    .map { txEnvelope =>
      txEnvelope.commit()
      txEnvelope.message
    }
    .runWith(Sink.seq)
Java
ConnectionFactory connectionFactory = server.createConnectionFactory();

Source<akka.stream.alpakka.jms.TxEnvelope, JmsConsumerControl> jmsSource =
    JmsConsumer.txSource(
        JmsConsumerSettings.create(system, connectionFactory)
            .withSessionCount(5)
            .withAckTimeout(Duration.ofSeconds(1))
            .withQueue("test"));

CompletionStage<List<javax.jms.Message>> result =
    jmsSource
        .take(msgsIn.size())
        .map(
            txEnvelope -> {
              txEnvelope.commit();
              return txEnvelope.message();
            })
        .runWith(Sink.seq(), materializer);

The sessionCount parameter controls the number of JMS sessions to run in parallel.

The ackTimeout parameter controls the maximum time given to a message to be committed or rolled back. If the message times out it will automatically be rolled back. This is to prevent stream from starvation if the application fails to commit or rollback a message, or if the message errors out and the stream is resumed by a decider.

Notes:

  • Higher throughput is achieved by increasing the sessionCount.
  • Messages will arrive out of order if sessionCount is larger than 1.
  • Buffering is not supported in transaction mode. The bufferSize is ignored.
  • The default AcknowledgeMode is SessionTransacted but can be overridden to custom AcknowledgeModes, even implementation-specific ones by setting the AcknowledgeMode in the JmsConsumerSettings when creating the stream.

Using JMS selectors

Create a javax.jms.Message source specifying a JMS selector expression: Verify that we are only receiving messages according to the selector:

Scala
val jmsSource = JmsConsumer(
  JmsConsumerSettings(consumerConfig, connectionFactory)
    .withQueue("numbers")
    .withSelector("IsOdd = TRUE")
)
Java
Source<Message, JmsConsumerControl> jmsSource =
    JmsConsumer.create(
        JmsConsumerSettings.create(system, connectionFactory)
            .withQueue("test")
            .withSelector("IsOdd = TRUE"));

Raw JVM type sources

Stream element type Alpakka source factory
String JmsConsumer.textSource
Array[Byte]byte[] JmsConsumer.bytesSource
Map[String, AnyRef]Map<String, Object> JmsConsumer.mapSource
Object (java.io.Serializable) JmsConsumer.objectSource

Text sources

The textSource emits the received message body as String:

Scala
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test")
)

val result: Future[immutable.Seq[String]] = jmsSource.take(in.size).runWith(Sink.seq)
Java
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
Source<String, JmsConsumerControl> jmsSource =
    JmsConsumer.textSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<List<String>> result =
    jmsSource.take(in.size()).runWith(Sink.seq(), materializer);

Byte array sources

The bytesSource emits the received message body as byte array:

Scala
val jmsSource: Source[Array[Byte], JmsConsumerControl] = JmsConsumer.bytesSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test")
)

val result: Future[Array[Byte]] =
  jmsSource
    .take(1)
    .runWith(Sink.head)
Java
ConnectionFactory connectionFactory = server.createConnectionFactory();

Source<byte[], JmsConsumerControl> jmsSource =
    JmsConsumer.bytesSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<byte[]> result = jmsSource.take(1).runWith(Sink.head(), materializer);

Map sources

The mapSource emits the received message body as Map[String, Object]Map<String, Object>:

Scala
val jmsSource: Source[Map[String, Any], JmsConsumerControl] = JmsConsumer.mapSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test")
)

val result: Future[immutable.Seq[Map[String, Any]]] =
  jmsSource
    .take(1)
    .runWith(Sink.seq)
Java
ConnectionFactory connectionFactory = server.createConnectionFactory();

Source<Map<String, Object>, JmsConsumerControl> jmsSource =
    JmsConsumer.mapSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<Map<String, Object>> resultStage =
    jmsSource.take(1).runWith(Sink.head(), materializer);

Object sources

The objectSource emits the received message body as deserialized JVM instance. As serialization may be a security concern, JMS clients require special configuration to allow this. The example shows how to configure ActiveMQ connection factory to support serialization. See ActiveMQ Security for more information on this.

Scala
val connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setTrustedPackages(List(classOf[DummyObject].getPackage.getName).asJava)
val jmsSource: Source[java.io.Serializable, JmsConsumerControl] = JmsConsumer.objectSource(
  JmsConsumerSettings(system, connectionFactory).withQueue("test")
)

val result: Future[java.io.Serializable] =
  jmsSource
    .take(1)
    .runWith(Sink.head)
Java
ActiveMQConnectionFactory connectionFactory =
    (ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setTrustedPackages(
    Arrays.asList(DummyJavaTests.class.getPackage().getName()));

Source<java.io.Serializable, JmsConsumerControl> jmsSource =
    JmsConsumer.objectSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

CompletionStage<java.io.Serializable> result =
    jmsSource.take(1).runWith(Sink.head(), materializer);

Request / Reply

The request / reply pattern can be implemented by streaming a JmsConsumerJmsConsumer to a JmsProducerJmsProducer, with a stage in between that extracts the ReplyTo and CorrelationID from the original message and adds them to the response.

Scala
val respondStreamControl: JmsConsumerControl =
  JmsConsumer(JmsConsumerSettings(system, connectionFactory).withQueue("test"))
    .collect {
      case message: TextMessage => JmsTextMessage(message)
    }
    .map { textMessage =>
      textMessage.headers.foldLeft(JmsTextMessage(textMessage.body.reverse)) {
        case (acc, rt: JmsReplyTo) => acc.to(rt.jmsDestination)
        case (acc, cId: JmsCorrelationId) => acc.withHeader(cId)
        case (acc, _) => acc
      }
    }
    .via {
      JmsProducer.flow(
        JmsProducerSettings(system, connectionFactory).withQueue("ignored")
      )
    }
    .to(Sink.ignore)
    .run()
Java
JmsConsumerControl respondStreamControl =
    JmsConsumer.create(
            JmsConsumerSettings.create(system, connectionFactory).withQueue("test"))
        .map(JmsMessageFactory::create)
        .collectType(JmsTextMessage.class)
        .map(
            textMessage -> {
              JmsTextMessage m = JmsTextMessage.create(reverse.apply(textMessage.body()));
              for (JmsHeader h : textMessage.getHeaders())
                if (h.getClass().equals(JmsReplyTo.class))
                  m = m.to(((JmsReplyTo) h).jmsDestination());
                else if (h.getClass().equals(JmsCorrelationId.class)) m = m.withHeader(h);
              return m;
            })
        .via(
            JmsProducer.flow(
                JmsProducerSettings.create(system, connectionFactory)
                    .withQueue("ignored")))
        .to(Sink.ignore())
        .run(materializer);
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.