JMS

Example: Read text messages from JMS queue and append to file

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (3),
  • and appends the data to the file target/out (2).
Scala
import java.nio.file.Paths

import akka.stream.IOResult
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, JmsConsumerControl] =        // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test")
  )

val fileSink: Sink[ByteString, Future[IOResult]] = // (2)
  FileIO.toPath(Paths.get("target/out.txt"))

val (runningSource, finished): (JmsConsumerControl, Future[IOResult]) =
                                                   // stream element type
  jmsSource                                        //: String
    .map(ByteString(_))                            //: ByteString    (3)
    .toMat(fileSink)(Keep.both)
    .run()
Java
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.IOResult;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import java.nio.file.Paths;
import java.util.concurrent.CompletionStage;


Source<String, JmsConsumerControl> jmsSource = // (1)
    JmsConsumer.textSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

Sink<ByteString, CompletionStage<IOResult>> fileSink =
    FileIO.toPath(Paths.get("target/out.txt")); // (2)

Pair<JmsConsumerControl, CompletionStage<IOResult>> pair =
    jmsSource // : String
        .map(ByteString::fromString) // : ByteString    (3)
        .toMat(fileSink, Keep.both())
        .run(materializer);

Example: Read text messages from JMS queue and create one file per message

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • combines the incoming data with a counter (3),
  • creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (4).
Scala
import java.nio.file.Paths

import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{FileIO, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, JmsConsumerControl] =                                   // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test")
  )
                                                          // stream element type
val runningSource = jmsSource                             //: String
  .map(ByteString(_))                                     //: ByteString         (2)
  .zipWithIndex                                           //: (ByteString, Long) (3)
  .mapAsyncUnordered(parallelism = 5) { case (byteStr, number) =>
    Source                                                //                     (4)
      .single(byteStr)
      .runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt")))
  }                                                       //: IoResult
  .toMat(Sink.ignore)(Keep.left)
  .run()
Java
import akka.Done;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.KillSwitch;
import akka.stream.Materializer;
import akka.stream.alpakka.jms.JmsConsumerSettings;
import akka.stream.alpakka.jms.JmsProducerSettings;
import akka.stream.alpakka.jms.javadsl.JmsConsumer;
import akka.stream.alpakka.jms.javadsl.JmsConsumerControl;
import akka.stream.alpakka.jms.javadsl.JmsProducer;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import java.nio.file.Paths;
import java.util.Arrays;
import java.util.concurrent.CompletionStage;


Source<String, JmsConsumerControl> jmsConsumer = // (1)
    JmsConsumer.textSource(
        JmsConsumerSettings.create(system, connectionFactory).withQueue("test"));

int parallelism = 5;
Pair<JmsConsumerControl, CompletionStage<Done>> pair =
    jmsConsumer // : String
        .map(ByteString::fromString) // : ByteString             (2)
        .zipWithIndex() // : Pair<ByteString, Long> (3)
        .mapAsyncUnordered(
            parallelism,
            (in) -> {
              ByteString byteString = in.first();
              Long number = in.second();
              return Source // (4)
                  .single(byteString)
                  .runWith(
                      FileIO.toPath(Paths.get("target/out-" + number + ".txt")), materializer);
            }) // : IoResult
        .toMat(Sink.ignore(), Keep.both())
        .run(materializer);

Example: Read text messages from JMS queue and send to web server

  • listens to the JMS queue “test” receiving Strings (1),
  • converts incoming data to akka.util.ByteString (2),
  • puts the received text into an HttpRequest (3),
  • sends the created request via Akka Http (4),
  • prints the HttpResponse to standard out (5).
Scala
import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

val jmsSource: Source[String, JmsConsumerControl] =                                 // (1)
  JmsConsumer.textSource(
    JmsConsumerSettings(actorSystem,connectionFactory).withBufferSize(10).withQueue("test")
  )

val (runningSource, finished): (JmsConsumerControl, Future[Done]) =
  jmsSource                                                   //: String
    .map(ByteString(_))                                       //: ByteString   (2)
    .map { bs =>
      HttpRequest(uri = Uri("http://localhost:8080/hello"),   //: HttpRequest  (3)
        entity = HttpEntity(bs))
    }
    .mapAsyncUnordered(4)(Http().singleRequest(_))            //: HttpResponse (4)
    .toMat(Sink.foreach(println))(Keep.both)                  //               (5)
    .run()
Java

import akka.Done; import akka.actor.ActorSystem; import akka.http.javadsl.Http; import akka.http.javadsl.model.HttpRequest; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import playground.ActiveMqBroker; import playground.WebServer; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.util.Arrays; import java.util.concurrent.CompletionStage; final Http http = Http.get(system); Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); int parallelism = 4; Pair<JmsConsumerControl, CompletionStage<Done>> pair = jmsSource // : String .map(ByteString::fromString) // : ByteString (2) .map( bs -> HttpRequest.create("http://localhost:8080/hello") .withEntity(bs)) // : HttpRequest (3) .mapAsyncUnordered(parallelism, http::singleRequest) // : HttpResponse (4) .toMat(Sink.foreach(System.out::println), Keep.both()) // (5) .run(materializer);

Example: Read text messages from JMS queue and send to web socket

  • listens to the JMS queue “test” receiving Strings (1),
  • configures a web socket flow to localhost (2),
  • converts incoming data to a ws.TextMessageakka.http.javadsl.model.ws.TextMessage,
  • pass the message via the web socket flow (4),
  • convert the (potentially chunked) web socket reply to a String (5),
  • prefix the String (6),
  • end the stream by writing the values to standard out (7).
Scala
import akka.Done
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse}
import akka.stream.alpakka.jms.JmsConsumerSettings
import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.concurrent.Future

val jmsSource: Source[String, JmsConsumerControl] =
  JmsConsumer.textSource(                                                           // (1)
    JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test")
  )

val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2)
  Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping"))

val ((runningSource, wsUpgradeResponse), streamCompletion): ((JmsConsumerControl, Future[WebSocketUpgradeResponse]), Future[Done]) =
                                                   // stream element type
  jmsSource                                        //: String
    .map(ws.TextMessage(_))                        //: ws.TextMessage                  (3)
    .viaMat(webSocketFlow)(Keep.both)              //: ws.TextMessage                  (4)
    .mapAsync(1)(wsMessageToString)                //: String                          (5)
    .map("client received: " + _)                  //: String                          (6)
    .toMat(Sink.foreach(println))(Keep.both)       //                                  (7)
    .run()

/**
 * Convert potentially chunked WebSocket Message to a string.
 */
def wsMessageToString: ws.Message => Future[String] = {
  case message: ws.TextMessage.Strict =>
    Future.successful(message.text)

  case message: ws.TextMessage.Streamed =>
    val seq = message.textStream.runWith(Sink.seq)
    seq.map(seq => seq.mkString)

  case message =>
    Future.successful(message.toString)
}
Java

import akka.Done; import akka.actor.ActorSystem; import akka.http.javadsl.Http; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.model.ws.Message; import akka.http.javadsl.model.ws.TextMessage; import akka.http.javadsl.model.ws.WebSocketRequest; import akka.http.javadsl.model.ws.WebSocketUpgradeResponse; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import playground.ActiveMqBroker; import playground.WebServer; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; final Http http = Http.get(system); Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory) .withBufferSize(10) .withQueue("test")); Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = // (2) http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping")); int parallelism = 4; Pair<Pair<JmsConsumerControl, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>> pair = jmsSource // : String .map( s -> { Message msg = TextMessage.create(s); return msg; }) // : Message (3) .viaMat(webSocketFlow, Keep.both()) // : Message (4) .mapAsync(parallelism, this::wsMessageToString) // : String (5) .map(s -> "client received: " + s) // : String (6) .toMat(Sink.foreach(System.out::println), Keep.both()) // (7) .run(materializer); /** Convert potentially chunked WebSocket Message to a string. */ private CompletionStage<String> wsMessageToString(Message msg) { if (msg.isText()) { TextMessage tMsg = msg.asTextMessage(); if (tMsg.isStrict()) { return CompletableFuture.completedFuture(tMsg.getStrictText()); } else { CompletionStage<List<String>> strings = tMsg.getStreamedText().runWith(Sink.seq(), materializer); return strings.thenApply(list -> String.join("", list)); } } else { return CompletableFuture.completedFuture(msg.toString()); } }

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/run
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.