Using IBM MQ

You can use IBM MQ like any other JMS Provider by creating a QueueConnectionFactory or a TopicConnectionFactory and creating a JmsConsumerSettings or JmsProducerSettings from it. The below snippets have been tested with a default IBM MQ docker image which contains queues and topics for testing. The following command starts MQ 9 using docker:

docker run --env LICENSE=accept --env MQ_QMGR_NAME=QM1 --publish 1414:1414 --publish 9443:9443 ibmcom/mq:9.1.1.0

MQ settings for this image are shown here: https://github.com/ibm-messaging/mq-docker#mq-developer-defaults

Artifacts

sbt
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-jms" % "1.1.1",
  "javax.jms" % "jms" % "1.1",
  "com.ibm.mq" % "com.ibm.mq.allclient" % "9.1.1.0"
)
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-jms_2.12</artifactId>
  <version>1.1.1</version>
</dependency>
<dependency>
  <groupId>javax.jms</groupId>
  <artifactId>jms</artifactId>
  <version>1.1</version>
</dependency>
<dependency>
  <groupId>com.ibm.mq</groupId>
  <artifactId>com.ibm.mq.allclient</artifactId>
  <version>9.1.1.0</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-jms_2.12', version: '1.1.1',
  compile group: 'javax.jms', name: 'jms', version: '1.1',
  compile group: 'com.ibm.mq', name: 'com.ibm.mq.allclient', version: '9.1.1.0'
}

Create a MQConnectionFactory

The MQConnectionFactory needs a queue manager name and a channel name, the docker command used in the previous section sets up a QM1 queue manager and a DEV.APP.SVRCONN channel. The IBM MQ client makes it possible to connect to the MQ server over TCP/IP or natively through JNI (when the client and server run on the same machine). In the examples below we have chosen to use TCP/IP, which is done by setting the transport type to CommonConstants.WMQ_CM_CLIENT.

Depending on the connection target, choose an appropriate implementation for the connection factory.

Scala
// Create the IBM MQ MQQueueConnectionFactory
val connectionFactory = new MQQueueConnectionFactory()

// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost")
connectionFactory.setPort(1414)
connectionFactory.setQueueManager("QM1")
connectionFactory.setChannel("DEV.APP.SVRCONN")
Java
// Create the IBM MQ MQQueueConnectionFactory
MQQueueConnectionFactory connectionFactory = new MQQueueConnectionFactory();

// align to docker image: ibmcom/mq:9.1.1.0
connectionFactory.setHostName("localhost");
connectionFactory.setPort(1414);
connectionFactory.setQueueManager("QM1");
connectionFactory.setChannel("DEV.APP.SVRCONN");

Create a JmsConsumer and JmsProducer to a Queue

Scala
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val queueName = "DEV.QUEUE.1"

val jmsSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings(producerConfig, queueConnectionFactory)
    .withQueue(queueName)
)

// Option1: create Source using default factory with just name
val jmsSource: Source[TxEnvelope, JmsConsumerControl] = JmsConsumer.txSource(
  JmsConsumerSettings(consumerConfig, queueConnectionFactory)
    .withQueue(queueName)
)
Java
// Connect to IBM MQ over TCP/IP
queueConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String queueName = "DEV.QUEUE.1";

Sink<String, CompletionStage<Done>> jmsSink =
    JmsProducer.textSink(
        JmsProducerSettings.create(system, queueConnectionFactory).withQueue(queueName));

// Option1: create Source using default factory with just name
Source<TxEnvelope, JmsConsumerControl> txJmsSource =
    JmsConsumer.txSource(
        JmsConsumerSettings.create(system, queueConnectionFactory).withQueue(queueName));

Create a JmsConsumer and JmsProducer to a Topic

The IBM MQ docker container sets up a dev/ topic, which is used in the example below.

Scala
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT)
val testTopicName = "dev/"

val jmsTopicSink: Sink[String, Future[Done]] = JmsProducer.textSink(
  JmsProducerSettings
    .create(system, topicConnectionFactory)
    .withTopic(testTopicName)
)

// Option1: create Source using default factory with just name
val jmsTopicSource: Source[String, JmsConsumerControl] = JmsConsumer
  .textSource(
    JmsConsumerSettings
      .create(system, topicConnectionFactory)
      .withTopic(testTopicName)
  )
Java
// Connect to IBM MQ over TCP/IP
topicConnectionFactory.setTransportType(CommonConstants.WMQ_CM_CLIENT);
String testTopicName = "dev/";

Sink<String, CompletionStage<Done>> jmsTopicSink =
    JmsProducer.textSink(
        JmsProducerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));

// Option1: create Source using default factory with just name
Source<String, JmsConsumerControl> jmsTopicSource =
    JmsConsumer.textSource(
        JmsConsumerSettings.create(system, topicConnectionFactory).withTopic(testTopicName));

Create a JmsConsumer and JmsProducer to custom destination

Example with custom queue.

Scala
  // Option2: create Source using custom factory
  val customQueue = "DEV.QUEUE.3"
  val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource(
    JmsConsumerSettings
      .create(system, queueConnectionFactory)
      .withDestination(CustomDestination("custom", createQueue(customQueue)))
  )

def createQueue(destinationName: String): Session => javax.jms.Queue = { (session: Session) =>
  // cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
  val mqSession = session.asInstanceOf[MQQueueSession]
  mqSession.createQueue(destinationName)
}
Java
  // Option2: create Source using custom factory
  String customQueue = "DEV.QUEUE.3";
  Source<String, JmsConsumerControl> jmsSource =
      JmsConsumer.textSource(
          JmsConsumerSettings.create(system, queueConnectionFactory)
              .withDestination(new CustomDestination("custom", createQueue(customQueue))));

Function<Session, Destination> createQueue(String destinationName) {
  return (session) -> {
    // cast to correct session implementation: MQQueueSession, MQTopicSession, MQSession
    MQQueueSession mqSession = (MQQueueSession) session;
    try {
      return mqSession.createQueue(destinationName);
    } catch (JMSException e) {
      throw new RuntimeException(e);
    }
  };
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.