Producer

The Alpakka Jakarta Messaging connector offers producing messages to topics or queues in three ways

  • JVM types to an Akka Streams Sink
  • JmsMessage sub-types to a Akka Streams Sink or Flow (using JmsProducer.sink or JmsProducer.flow)
  • JmsEnvelope sub-types to a Akka Streams Flow (using JmsProducer.flexiFlow) to support pass-throughs

The JMS message model supports several types of message bodies in (see jakarta.jms.Message), which may be created directly from the Akka Stream elements, or in wrappers to access more advanced features.

Stream element type Alpakka producer
String JmsProducer.textSink
byte[] JmsProducer.bytesSink
Map<String, Object> JmsProducer.mapSink
Object (java.io.Serializable) JmsProducer.objectSink
JmsTextMessage JmsProducer.sink or JmsProducer.flow
JmsByteMessage JmsProducer.sink or JmsProducer.flow
JmsByteStringMessage JmsProducer.sink or JmsProducer.flow
JmsMapMessage JmsProducer.sink or JmsProducer.flow
JmsObjectMessage JmsProducer.sink or JmsProducer.flow
JmsEnvelope<PassThrough> with instances JmsPassThrough, JmsTextMessagePassThrough, JmsByteMessagePassThrough, JmsByteStringMessagePassThrough, JmsMapMessagePassThrough, JmsObjectMessagePassThrough JmsProducer.flexiFlow

Configure JMS producers

To connect to the JMS broker, first define an appropriate jakarta.jms.ConnectionFactory. Here we’re using ActiveMQ Artemis.

Scala
sourceval connectionFactory: jakarta.jms.ConnectionFactory =
  new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(url)
Java
sourcejakarta.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

The created ConnectionFactory is then used for the creation of the different JMS sinks or sources (see below).

A JmsMessage sub-type sink

Use a case class with the subtype of JmsMessage to wrap the messages you want to send and optionally set message specific properties or headers. JmsProducer contains factory methods to facilitate the creation of sinks according to the message type.

Scala
sourceval jmsSink: Sink[JmsTextMessage, Future[Done]] = JmsProducer.sink(
  JmsProducerSettings(producerConfig, connectionFactory).withQueue("numbers")
)

val finished: Future[Done] =
  Source(immutable.Seq("Message A", "Message B"))
    .map(JmsTextMessage(_))
    .runWith(jmsSink)
Java
sourceSink<JmsTextMessage, CompletionStage<Done>> jmsSink =
    JmsProducer.sink(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

CompletionStage<Done> finished =
    Source.from(Arrays.asList("Message A", "Message B"))
        .map(JmsTextMessage::create)
        .runWith(jmsSink, system);

Setting JMS message properties

For every JmsMessage you can set JMS message properties.

Scala
sourceval msgsIn = (1 to 10).toList.map { n =>
  akka.stream.alpakka.jakartajms
    .JmsTextMessage(n.toString)
    .withProperty("Number", n)
    .withProperty("IsOdd", n % 2 == 1)
    .withProperty("IsEven", n % 2 == 0)
}
Java
sourceJmsTextMessage message =
    akka.stream.alpakka.jakartajms.JmsTextMessage.create(n.toString())
        .withProperty("Number", n)
        .withProperty("IsOdd", n % 2 == 1)
        .withProperty("IsEven", n % 2 == 0);

Setting JMS message header attributes

For every JmsMessage you can set also JMS message headers.

Scala
sourceval msgsIn = (1 to 10).toList.map { n =>
  JmsTextMessage(n.toString)
    .withHeader(JmsType("type"))
    .withHeader(JmsCorrelationId("correlationId"))
    .withHeader(JmsReplyTo.queue("test-reply"))
    .withHeader(JmsTimeToLive(FiniteDuration(999, TimeUnit.SECONDS)))
    .withHeader(JmsPriority(2))
    .withHeader(JmsDeliveryMode(DeliveryMode.NON_PERSISTENT))
}
Java
sourceList<JmsTextMessage> msgsIn =
    createTestMessageList().stream()
        .map(
            jmsTextMessage ->
                jmsTextMessage
                    .withHeader(JmsType.create("type"))
                    .withHeader(JmsCorrelationId.create("correlationId"))
                    .withHeader(JmsReplyTo.queue("test-reply"))
                    .withHeader(JmsTimeToLive.create(999, TimeUnit.SECONDS))
                    .withHeader(JmsPriority.create(2))
                    .withHeader(JmsDeliveryMode.create(DeliveryMode.NON_PERSISTENT)))
        .collect(Collectors.toList());

Raw JVM type sinks

Text sinks

Create a sink, that accepts and forwards JmsTextMessages to the JMS provider:

Scala
sourceval connectionFactory: jakarta.jms.ConnectionFactory =
  new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(url)

val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test")
)

val in = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val streamCompletion: Future[Done] =
  Source(in)
    .runWith(jmsSink)
Java
sourcejakarta.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();

Sink<String, CompletionStage<Done>> jmsSink =
    JmsProducer.textSink(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

List<String> in = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
CompletionStage<Done> finished = Source.from(in).runWith(jmsSink, system);

Byte array sinks

Create a sink, that accepts and forwards JmsByteMessages to the JMS provider.

Scala
sourceval jmsSink: Sink[Array[Byte], Future[Done]] = JmsProducer.bytesSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test")
)
val in: Array[Byte] = "ThisIsATest".getBytes(Charset.forName("UTF-8"))
val streamCompletion: Future[Done] =
  Source
    .single(in)
    .runWith(jmsSink)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Sink<byte[], CompletionStage<Done>> jmsSink =
    JmsProducer.bytesSink(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

byte[] in = "ThisIsATest".getBytes(Charset.forName("UTF-8"));
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);

Map message sink

Create a sink, that accepts and forwards JmsMapMessages to the JMS provider:

Scala
sourceval jmsSink: Sink[Map[String, Any], Future[Done]] = JmsProducer.mapSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test")
)

val input = List(
  Map[String, Any](
    "string" -> "value",
    "int value" -> 42,
    "double value" -> 43.toDouble,
    "short value" -> 7.toShort,
    "boolean value" -> true,
    "long value" -> 7.toLong,
    "bytearray" -> "AStringAsByteArray".getBytes(Charset.forName("UTF-8")),
    "byte" -> 1.toByte
  )
)

val streamCompletion: Future[Done] =
  Source(input)
    .runWith(jmsSink)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Sink<Map<String, Object>, CompletionStage<Done>> jmsSink =
    JmsProducer.mapSink(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

Map<String, Object> in = new HashMap<>();
in.put("string value", "value");
in.put("int value", 42);
in.put("double value", 43.0);
in.put("short value", (short) 7);
in.put("boolean value", true);
in.put("long value", 7L);
in.put("bytearray", "AStringAsByteArray".getBytes(Charset.forName("UTF-8")));
in.put("byte", (byte) 1);

CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);

Object sinks

Create and configure ActiveMQ Artemis connection factory to support serialization. See Controlling JMS ObjectMessage deserialization for more information on this. Create a sink, that accepts and forwards JmsObjectMessages to the JMS provider:

Scala
sourceval connectionFactory = connFactory.asInstanceOf[ActiveMQConnectionFactory]
connectionFactory.setDeserializationAllowList(classOf[DummyObject].getPackage.getName)

val jmsSink: Sink[Serializable, Future[Done]] = JmsProducer.objectSink(
  JmsProducerSettings(system, connectionFactory).withQueue("test")
)
val in = DummyObject("ThisIsATest")
val streamCompletion: Future[Done] =
  Source
    .single(in)
    .runWith(jmsSink)
Java
sourceActiveMQConnectionFactory connectionFactory =
    (ActiveMQConnectionFactory) server.createConnectionFactory();
connectionFactory.setDeserializationAllowList(
    DummyJavaTests.class.getPackage().getName());

Sink<java.io.Serializable, CompletionStage<Done>> jmsSink =
    JmsProducer.objectSink(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

java.io.Serializable in = new DummyJavaTests("javaTest");
CompletionStage<Done> finished = Source.single(in).runWith(jmsSink, system);

Sending messages as a Flow

The producer can also act as a flow, in order to publish messages in the middle of stream processing. For example, you can ensure that a message is persisted to the queue before subsequent processing.

Scala
source
val flow: Flow[JmsMessage, JmsMessage, JmsProducerStatus] = JmsProducer.flow( JmsProducerSettings(system, connectionFactory) .withQueue("test") ) val input: immutable.Seq[JmsTextMessage] = (1 to 100).map(i => JmsTextMessage(i.toString)) val result: Future[Seq[JmsMessage]] = Source(input) .via(flow) .runWith(Sink.seq)
Java
sourceConnectionFactory connectionFactory = server.createConnectionFactory();

Flow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flow =
    JmsProducer.flow(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

List<JmsTextMessage> input = createTestMessageList();

CompletionStage<List<JmsTextMessage>> result =
    Source.from(input).via(flow).runWith(Sink.seq(), system);

Sending messages with per-message destinations

It is also possible to define message destinations per message:

Scala
sourceval flowSink: Flow[JmsMessage, JmsMessage, JmsProducerStatus] =
  JmsProducer.flow(
    JmsProducerSettings(system, connectionFactory).withQueue("test")
  )

val input = (1 to 100).map { i =>
  val queueName = if (i % 2 == 0) "even" else "odd"
  JmsTextMessage(i.toString).toQueue(queueName)
}
Source(input).via(flowSink).runWith(Sink.ignore)
Java
sourceFlow<JmsTextMessage, JmsTextMessage, JmsProducerStatus> flowSink =
    JmsProducer.flow(
        JmsProducerSettings.create(system, connectionFactory).withQueue("test"));

List<JmsTextMessage> input = new ArrayList<>();
for (Integer n : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) {
  String queueName = (n % 2 == 0) ? "even" : "odd";
  input.add(JmsTextMessage.create(n.toString()).toQueue(queueName));
}

Source.from(input).via(flowSink).runWith(Sink.seq(), system);

When no destination is defined on the message, the destination given in the producer settings is used.

Passing context through the producer

In some use cases, it is useful to pass through context information when producing (e.g. for acknowledging or committing messages after sending to Jms). For this, the JmsProducer.flexiFlow accepts implementations of JmsEnvelope, which it will pass through:

  • JmsPassThrough
  • JmsTextMessagePassThrough
  • JmsByteMessagePassThrough
  • JmsByteStringMessagePassThrough
  • JmsMapMessagePassThrough
  • JmsObjectMessagePassThrough
Scala
sourceval jmsProducer: Flow[JmsEnvelope[String], JmsEnvelope[String], JmsProducerStatus] =
  JmsProducer.flexiFlow[String](
    JmsProducerSettings(system, connectionFactory).withQueue("test")
  )

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in: immutable.Seq[JmsTextMessagePassThrough[String]] =
  data.map(t => JmsTextMessage(t).withPassThrough(t))

val result = Source(in)
  .via(jmsProducer)
  .map(_.passThrough) // extract the value passed through
  .runWith(Sink.seq)
Java
sourceFlow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer =
    JmsProducer.flexiFlow(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<JmsEnvelope<String>> input = new ArrayList<>();
for (String s : data) {
  String passThrough = s;
  input.add(JmsTextMessage.create(s, passThrough));
}

CompletionStage<List<String>> result =
    Source.from(input)
        .via(jmsProducer)
        .map(JmsEnvelope::passThrough)
        .runWith(Sink.seq(), system);

There are two implementations: One envelope type containing a messages to send to Jms, and one envelope type containing only values to pass through. This allows messages to flow without producing any new messages to Jms. This is primarily useful when committing offsets back to Kakfa, or when acknowledging Jms messages after sending the outcome of processing them back to Jms.

Scala
sourceval jmsProducer = JmsProducer.flexiFlow[String](
  JmsProducerSettings(producerConfig, connectionFactory).withQueue("topic")
)

val data = List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k")
val in = data.map(t => JmsPassThrough(t))

val result = Source(in).via(jmsProducer).map(_.passThrough).runWith(Sink.seq)
Java
sourceFlow<JmsEnvelope<String>, JmsEnvelope<String>, JmsProducerStatus> jmsProducer =
    JmsProducer.flexiFlow(
        JmsProducerSettings.create(producerConfig, connectionFactory).withQueue("test"));

List<String> data = Arrays.asList("a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k");
List<JmsEnvelope<String>> input = new ArrayList<>();
for (String s : data) {
  String passThrough = s;
  input.add(JmsPassThrough.create(passThrough));
}

CompletionStage<List<String>> result =
    Source.from(input)
        .via(jmsProducer)
        .map(JmsEnvelope::passThrough)
        .runWith(Sink.seq(), system);

Producer Settings

The Alpakka Jakarta Messaging producer is configured via default settings in the HOCON config file section alpakka.jakarta-jms.producer in your application.conf, and settings may be tweaked in the code using the withXyz methods.

The JmsProducerSettings factories allow for passing the actor system to read from the default alpakka.jakarta-jms.producer section, or you may pass a Config instance which is resolved to a section of the same structure.

Scala
sourceval producerConfig: Config = system.settings.config.getConfig(JmsProducerSettings.configPath)
val settings = JmsProducerSettings(producerConfig, connectionFactory)
  .withTopic("target-topic")
  .withCredentials(Credentials("username", "password"))
  .withSessionCount(1)
Java
sourceConfig producerConfig = config.getConfig(JmsProducerSettings.configPath());
JmsProducerSettings settings =
    JmsProducerSettings.create(producerConfig, new ActiveMQConnectionFactory(brokerUrl))
        .withTopic("target-topic")
        .withCredentials(Credentials.create("username", "password"))
        .withConnectionRetrySettings(retrySettings)
        .withSendRetrySettings(sendRetrySettings)
        .withSessionCount(10)
        .withTimeToLive(Duration.ofHours(1));

The producer can be configured with the following settings. On the second tab, the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Defaults Description
connectionFactory mandatory Factory to use for creating JMS connections
destination mandatory Destination (queue or topic) to send JMS messages to
credentials optional JMS broker credentials
connectionRetrySettings default settings Retry characteristics if the connection failed to be established or taking a long time. Please see default values under Connection Retries
sendRetrySettings default settings Retry characteristics if message sending failed. Please see default values under Send Retries
sessionCount defaults to 1 Number of parallel sessions to use for sending JMS messages. Increasing the number of parallel sessions increases throughput at the cost of message ordering. While the messages may arrive out of order on the JMS broker, the producer flow outputs messages in the order they are received
timeToLive optional Time messages should be kept on the Jms broker. This setting can be overridden on individual messages. If not set, messages will never expire
connectionStatusSubscriptionTimeout 5 seconds Time to wait for subscriber of connection status events before starting to discard them
reference.conf
source# Jms Producer Settings
# sets default values
producer {
  # Configure connection retrying by providing settings for ConnectionRetrySettings.
  connection-retry = ${alpakka.jakarta-jms.connection-retry}
  # Configure re-sending by providing settings for SendRetrySettings.
  send-retry = ${alpakka.jakarta-jms.send-retry}
  # Credentials to connect to the JMS broker.
  # credentials {
  #   username = "some text"
  #   password = "some text"
  # }
  # "off" to not use any credentials.
  credentials = off
  # Number of parallel sessions to use for sending JMS messages.
  # Increasing the number of parallel sessions increases throughput at the cost of message ordering.
  # While the messages may arrive out of order on the JMS broker, the producer flow outputs messages
  # in the order they are received.
  session-count = 1
  # Time messages should be kept on the JMS broker.
  # This setting can be overridden on individual messages.
  # "off" to not let messages expire.
  time-to-live = off
  # How long the stage should preserve connection status events for the first subscriber before discarding them
  connection-status-subscription-timeout = 5 seconds
}

Connection Retries

When a connection to a broker cannot be established and errors out, or is timing out being established or started, the connection can be retried. All JMS publishers, consumers, and browsers are configured with connection retry settings. On the second tab the section from reference.conf shows the structure to use for configuring multiple set-ups.

Table
Setting Description Default Value
connectTimeout Time allowed to establish and start a connection 10 s
initialRetry Wait time before retrying the first time 100 ms
backoffFactor Back-off factor for subsequent retries 2.0
maxBackoff Maximum back-off time allowed, after which all retries will happen after this delay 1 minute
maxRetries Maximum number of retries allowed (negative value is infinite) 10
reference.conf
source# Connection Retry Settings
# these set the defaults for Consumer, Producer, and Browse settings
connection-retry {
  # Time allowed to establish and start a connection.
  connect-timeout = 10 seconds
  # Wait time before retrying the connection the first time.
  initial-retry = 100 millis
  # Back-off factor for subsequent retries.
  backoff-factor = 2
  # Back-off factor for subsequent retries.
  max-backoff = 1 minute
  # Maximum number of retries allowed.
  # "infinite", or positive integer
  max-retries = 10
}

The retry time is calculated by:

initialRetry * retryNumberbackoffFactor

With the default settings, we’ll see retries after 100ms, 400ms, 900ms pauses, until the pauses reach 1 minute and will stay with 1 minute intervals for any subsequent retries.

Consumers, producers and browsers try to reconnect with the same retry characteristics if a connection fails mid-stream.

All JMS settings support setting the connectionRetrySettings field using .withConnectionRetrySettings(retrySettings) on the given settings. The followings show how to create ConnectionRetrySettings:

Scala
source// reiterating defaults from reference.conf
val retrySettings = ConnectionRetrySettings(system)
  .withConnectTimeout(10.seconds)
  .withInitialRetry(100.millis)
  .withBackoffFactor(2.0d)
  .withMaxBackoff(1.minute)
  .withMaxRetries(10)
Java
sourceConfig connectionRetryConfig = config.getConfig("alpakka.jakarta-jms.connection-retry");
// reiterating the values from reference.conf
ConnectionRetrySettings retrySettings =
    ConnectionRetrySettings.create(connectionRetryConfig)
        .withConnectTimeout(Duration.ofSeconds(10))
        .withInitialRetry(Duration.ofMillis(100))
        .withBackoffFactor(2.0)
        .withMaxBackoff(Duration.ofMinutes(1))
        .withMaxRetries(10);

Send Retries

When a connection to a broker starts failing, sending JMS messages will also fail. Those failed messages can be retried at the cost of potentially duplicating the failed messages. Send retries can be configured as follows:

Table
Setting Description Default Value
initialRetry Wait time before retrying the first time 20 ms
backoffFactor Back-off factor for subsequent retries 1.5
maxBackoff Maximum back-off time allowed, after which all retries will happen after this delay 500 ms
maxRetries Maximum number of retries allowed (negative value is infinite) 10
reference.conf
source# Send Retry Settings
# these set the defaults for Producer settings
send-retry {
  # Wait time before retrying the first time.
  initial-retry = 20 millis
  # Back-off factor for subsequent retries.
  backoff-factor = 1.5
  # Maximum back-off time allowed, after which all retries will happen after this delay.
  max-backoff = 500 millis
  # Maximum number of retries allowed.
  # "infinite", or positive integer
  max-retries = 10
}

The retry time is calculated by:

initialRetry * retryNumberbackoffFactor

With the default settings, we’ll see retries after 20ms, 57ms, 104ms pauses, until the pauses reach 500 ms and will stay with 500 ms intervals for any subsequent retries.

JMS producer settings support configuring retries by using .withSendRetrySettings(retrySettings). The followings show how to create SendRetrySettings:

Scala
source// reiterating defaults from reference.conf
val sendRetrySettings = SendRetrySettings(system)
  .withInitialRetry(20.millis)
  .withBackoffFactor(1.5d)
  .withMaxBackoff(500.millis)
  .withMaxRetries(10)
Java
sourceimport com.typesafe.config.Config;
import scala.Option;

Config sendRetryConfig = config.getConfig("alpakka.jakarta-jms.send-retry");
// reiterating the values from reference.conf
SendRetrySettings sendRetrySettings =
    SendRetrySettings.create(sendRetryConfig)
        .withInitialRetry(Duration.ofMillis(20))
        .withBackoffFactor(1.5d)
        .withMaxBackoff(Duration.ofMillis(500))
        .withMaxRetries(10);

If a send operation finally fails, the stage also fails unless a different supervision strategy is applied. The producer stage honours stream supervision.

Observing connectivity and state of a JMS producer

All JMS producer’s materialized values are of type JmsProducerStatus. This provides a connectorState method returning a Source of JmsConnectorState updates that publishes connection attempts, disconnections, completions and failures. The source is completed after the JMS producer completes or fails.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.