Browse
Browsing messages
The browse source streams the messages in a queue without consuming them.
Unlike the other sources, the browse source will complete after browsing all the messages currently on the queue.
- Scala
-
source
val browseSource: Source[javax.jms.Message, NotUsed] = JmsConsumer.browse( JmsBrowseSettings(system, connectionFactory) .withQueue("test") ) val result: Future[immutable.Seq[javax.jms.Message]] = browseSource.runWith(Sink.seq) - Java
-
source
ConnectionFactory connectionFactory = server.createConnectionFactory(); Source<javax.jms.Message, NotUsed> browseSource = JmsConsumer.browse( JmsBrowseSettings.create(system, connectionFactory).withQueue("test")); CompletionStage<List<javax.jms.Message>> result = browseSource.runWith(Sink.seq(), system);
A JMS selector
can be used to filter the messages. Otherwise it will browse the entire content of the queue.
Notes:
- Messages may be arriving and expiring while the scan is done.
- The JMS API does not require the content of an enumeration to be a static snapshot of queue content. Whether these changes are visible or not depends on the JMS provider.
- A message must not be returned by a QueueBrowser before its delivery time has been reached.
Configure JMS browse
To connect to the JMS broker, first define an appropriate javax.jms.ConnectionFactory
. The Alpakka tests and all examples use Active MQ.
- Scala
-
source
val connectionFactory: javax.jms.ConnectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(url)
- Java
-
source
javax.jms.ConnectionFactory connectionFactory = server.createConnectionFactory();
The created ConnectionFactory
is then used for the creation of the different JMS sources.
The JmsBrowseSettings
factories allow for passing the actor system to read from the default alpakka.jms.browse
section, or you may pass a Config
instance which is resolved to a section of the same structure.
- Scala
-
source
val browseConfig: Config = system.settings.config.getConfig(JmsBrowseSettings.configPath) val settings = JmsBrowseSettings(browseConfig, connectionFactory) .withQueue("target-queue") .withCredentials(Credentials("username", "password")) .withConnectionRetrySettings(retrySettings)
- Java
-
source
Config consumerConfig = config.getConfig(JmsConsumerSettings.configPath()); JmsConsumerSettings settings = JmsConsumerSettings.create(consumerConfig, new ActiveMQConnectionFactory("broker-url")) .withTopic("message-topic") .withCredentials(Credentials.create("username", "password")) .withConnectionRetrySettings(retrySettings) .withSessionCount(10) .withAcknowledgeMode(AcknowledgeMode.AutoAcknowledge()) .withSelector("Important = TRUE");
The Alpakka JMS browse soruce is configured via default settings in the HOCON config file section alpakka.jms.browse
in your application.conf
, and settings may be tweaked in the code using the withXyz
methods. On the second tab the section from reference.conf
shows the structure to use for configuring multiple set-ups.
- Table
-
Setting Description Default Value connectionFactory Factory to use for creating JMS connections Must be set in code destination The queue to browse Must be set in code credentials JMS broker credentials Empty connectionRetrySettings Retry characteristics if the connection failed to be established or is taking a long time. See Connection Retries - reference.conf
-
source
# Jms Browse Settings # sets default values browse { # Configure connection retrying by providing settings for ConnectionRetrySettings. connection-retry = ${alpakka.jms.connection-retry} # Credentials to connect to the JMS broker. # credentials { # username = "some text" # password = "some text" # } # "off" to not use any credentials. credentials = off # JMS selector expression. # See https://docs.oracle.com/cd/E19798-01/821-1841/bncer/index.html # empty string for unset selector = "" # optional # Set an explicit acknowledge mode. # See eg. javax.jms.Session.AUTO_ACKNOWLEDGE # Allowed values: "auto", "client", "duplicates-ok", "session", integer value acknowledge-mode = auto }