JMS
Example: Read text messages from JMS queue and append to file
- listens to the JMS queue “test” receiving
String
s (1), - converts incoming data to
akka.util.ByteString
(3), - and appends the data to the file
target/out
(2).
- Scala
-
import java.nio.file.Paths import akka.stream.IOResult import akka.stream.alpakka.jms.JmsConsumerSettings import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} import akka.stream.scaladsl.{FileIO, Keep, Sink, Source} import akka.util.ByteString import scala.concurrent.Future import scala.concurrent.duration.DurationInt val jmsSource: Source[String, JmsConsumerControl] = // (1) JmsConsumer.textSource( JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test") ) val fileSink: Sink[ByteString, Future[IOResult]] = // (2) FileIO.toPath(Paths.get("target/out.txt")) val (runningSource, finished): (JmsConsumerControl, Future[IOResult]) = // stream element type jmsSource //: String .map(ByteString(_)) //: ByteString (3) .toMat(fileSink)(Keep.both) .run()
- Java
-
import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.IOResult; import akka.stream.Materializer; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.nio.file.Paths; import java.util.concurrent.CompletionStage; Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toPath(Paths.get("target/out.txt")); // (2) Pair<JmsConsumerControl, CompletionStage<IOResult>> pair = jmsSource // : String .map(ByteString::fromString) // : ByteString (3) .toMat(fileSink, Keep.both()) .run(materializer);
Example: Read text messages from JMS queue and create one file per message
- listens to the JMS queue “test” receiving
String
s (1), - converts incoming data to
akka.util.ByteString
(2), - combines the incoming data with a counter (3),
- creates an intermediary stream writing the incoming data to a file using the counter value to create unique file names (4).
- Scala
-
import java.nio.file.Paths import akka.stream.alpakka.jms.JmsConsumerSettings import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} import akka.stream.scaladsl.{FileIO, Keep, Sink, Source} import akka.util.ByteString import scala.concurrent.duration.DurationInt val jmsSource: Source[String, JmsConsumerControl] = // (1) JmsConsumer.textSource( JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test") ) // stream element type val runningSource = jmsSource //: String .map(ByteString(_)) //: ByteString (2) .zipWithIndex //: (ByteString, Long) (3) .mapAsyncUnordered(parallelism = 5) { case (byteStr, number) => Source // (4) .single(byteStr) .runWith(FileIO.toPath(Paths.get(s"target/out-$number.txt"))) } //: IoResult .toMat(Sink.ignore)(Keep.left) .run()
- Java
-
import akka.Done; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.KillSwitch; import akka.stream.Materializer; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import java.nio.file.Paths; import java.util.Arrays; import java.util.concurrent.CompletionStage; Source<String, JmsConsumerControl> jmsConsumer = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); int parallelism = 5; Pair<JmsConsumerControl, CompletionStage<Done>> pair = jmsConsumer // : String .map(ByteString::fromString) // : ByteString (2) .zipWithIndex() // : Pair<ByteString, Long> (3) .mapAsyncUnordered( parallelism, (in) -> { ByteString byteString = in.first(); Long number = in.second(); return Source // (4) .single(byteString) .runWith( FileIO.toPath(Paths.get("target/out-" + number + ".txt")), materializer); }) // : IoResult .toMat(Sink.ignore(), Keep.both()) .run(materializer);
Example: Read text messages from JMS queue and send to web server
- listens to the JMS queue “test” receiving
String
s (1), - converts incoming data to
akka.util.ByteString
(2), - puts the received text into an
HttpRequest
(3), - sends the created request via Akka Http (4),
- prints the
HttpResponse
to standard out (5).
- Scala
-
import akka.Done import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream.alpakka.jms.JmsConsumerSettings import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} import akka.stream.scaladsl.{Keep, Sink, Source} import akka.util.ByteString import scala.concurrent.Future import scala.concurrent.duration.DurationInt val jmsSource: Source[String, JmsConsumerControl] = // (1) JmsConsumer.textSource( JmsConsumerSettings(actorSystem,connectionFactory).withBufferSize(10).withQueue("test") ) val (runningSource, finished): (JmsConsumerControl, Future[Done]) = jmsSource //: String .map(ByteString(_)) //: ByteString (2) .map { bs => HttpRequest(uri = Uri("http://localhost:8080/hello"), //: HttpRequest (3) entity = HttpEntity(bs)) } .mapAsyncUnordered(4)(Http().singleRequest(_)) //: HttpResponse (4) .toMat(Sink.foreach(println))(Keep.both) // (5) .run()
- Java
-
import akka.Done; import akka.actor.ActorSystem; import akka.http.javadsl.Http; import akka.http.javadsl.model.HttpRequest; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import playground.ActiveMqBroker; import playground.WebServer; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.util.Arrays; import java.util.concurrent.CompletionStage; final Http http = Http.get(system); Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory).withQueue("test")); int parallelism = 4; Pair<JmsConsumerControl, CompletionStage<Done>> pair = jmsSource // : String .map(ByteString::fromString) // : ByteString (2) .map( bs -> HttpRequest.create("http://localhost:8080/hello") .withEntity(bs)) // : HttpRequest (3) .mapAsyncUnordered(parallelism, http::singleRequest) // : HttpResponse (4) .toMat(Sink.foreach(System.out::println), Keep.both()) // (5) .run(materializer);
Example: Read text messages from JMS queue and send to web socket
- listens to the JMS queue “test” receiving
String
s (1), - configures a web socket flow to localhost (2),
- converts incoming data to a ws.TextMessageakka.http.javadsl.model.ws.TextMessage,
- pass the message via the web socket flow (4),
- convert the (potentially chunked) web socket reply to a
String
(5), - prefix the
String
(6), - end the stream by writing the values to standard out (7).
- Scala
-
import akka.Done import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.http.scaladsl.model.ws.{WebSocketRequest, WebSocketUpgradeResponse} import akka.stream.alpakka.jms.JmsConsumerSettings import akka.stream.alpakka.jms.scaladsl.{JmsConsumer, JmsConsumerControl} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import scala.concurrent.Future val jmsSource: Source[String, JmsConsumerControl] = JmsConsumer.textSource( // (1) JmsConsumerSettings(actorSystem, connectionFactory).withBufferSize(10).withQueue("test") ) val webSocketFlow: Flow[ws.Message, ws.Message, Future[WebSocketUpgradeResponse]] = // (2) Http().webSocketClientFlow(WebSocketRequest("ws://localhost:8080/webSocket/ping")) val ((runningSource, wsUpgradeResponse), streamCompletion): ((JmsConsumerControl, Future[WebSocketUpgradeResponse]), Future[Done]) = // stream element type jmsSource //: String .map(ws.TextMessage(_)) //: ws.TextMessage (3) .viaMat(webSocketFlow)(Keep.both) //: ws.TextMessage (4) .mapAsync(1)(wsMessageToString) //: String (5) .map("client received: " + _) //: String (6) .toMat(Sink.foreach(println))(Keep.both) // (7) .run() /** * Convert potentially chunked WebSocket Message to a string. */ def wsMessageToString: ws.Message => Future[String] = { case message: ws.TextMessage.Strict => Future.successful(message.text) case message: ws.TextMessage.Streamed => val seq = message.textStream.runWith(Sink.seq) seq.map(seq => seq.mkString) case message => Future.successful(message.toString) }
- Java
-
import akka.Done; import akka.actor.ActorSystem; import akka.http.javadsl.Http; import akka.http.javadsl.model.StatusCodes; import akka.http.javadsl.model.ws.Message; import akka.http.javadsl.model.ws.TextMessage; import akka.http.javadsl.model.ws.WebSocketRequest; import akka.http.javadsl.model.ws.WebSocketUpgradeResponse; import akka.japi.Pair; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.alpakka.jms.JmsConsumerSettings; import akka.stream.alpakka.jms.JmsProducerSettings; import akka.stream.alpakka.jms.javadsl.JmsConsumer; import akka.stream.alpakka.jms.javadsl.JmsConsumerControl; import akka.stream.alpakka.jms.javadsl.JmsProducer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import playground.ActiveMqBroker; import playground.WebServer; import scala.concurrent.ExecutionContext; import javax.jms.ConnectionFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; final Http http = Http.get(system); Source<String, JmsConsumerControl> jmsSource = // (1) JmsConsumer.textSource( JmsConsumerSettings.create(system, connectionFactory) .withBufferSize(10) .withQueue("test")); Flow<Message, Message, CompletionStage<WebSocketUpgradeResponse>> webSocketFlow = // (2) http.webSocketClientFlow(WebSocketRequest.create("ws://localhost:8080/webSocket/ping")); int parallelism = 4; Pair<Pair<JmsConsumerControl, CompletionStage<WebSocketUpgradeResponse>>, CompletionStage<Done>> pair = jmsSource // : String .map( s -> { Message msg = TextMessage.create(s); return msg; }) // : Message (3) .viaMat(webSocketFlow, Keep.both()) // : Message (4) .mapAsync(parallelism, this::wsMessageToString) // : String (5) .map(s -> "client received: " + s) // : String (6) .toMat(Sink.foreach(System.out::println), Keep.both()) // (7) .run(materializer); /** Convert potentially chunked WebSocket Message to a string. */ private CompletionStage<String> wsMessageToString(Message msg) { if (msg.isText()) { TextMessage tMsg = msg.asTextMessage(); if (tMsg.isStrict()) { return CompletableFuture.completedFuture(tMsg.getStrictText()); } else { CompletionStage<List<String>> strings = tMsg.getStreamedText().runWith(Sink.seq(), materializer); return strings.thenApply(list -> String.join("", list)); } } else { return CompletableFuture.completedFuture(msg.toString()); } }
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- Scala
-
sbt > doc-examples/run