MQTT v5
MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.
Further information on mqtt.org.
The Alpakka MQTT v5 connector provides an Akka Stream source, sink and flow to connect to MQTT brokers. It is based on the Eclipse Paho Java client.
Project Info: Alpakka MQTT v5 | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-mqttv5
2.0.0 |
JDK versions | Adopt OpenJDK 11 |
Scala versions | 2.13.11 |
JPMS module name | akka.stream.alpakka.mqttv5 |
License | |
Readiness level |
Since 2.0, 2023-09-04
|
Home page | https://doc.akka.io/docs/akka-enhancements/current/index.html |
API documentation | |
Forums | |
Issues | Issue tracker |
Sources | https://github.com/lightbend/akka-enhancements |
Artifacts
These dependencies are available on request from Lightbend, please contact your sales representative.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.8.4" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-mqttv5" % "2.0.0", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
<properties> <akka.version>2.8.4</akka.version> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-mqttv5_${scala.binary.version}</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-stream_${scala.binary.version}</artifactId> <version>${akka.version}</version> </dependency> </dependencies>
- Gradle
def versions = [ AkkaVersion: "2.8.4", ScalaBinary: "2.13" ] dependencies { implementation "com.lightbend.akka:akka-stream-alpakka-mqttv5_${versions.ScalaBinary}:2.0.0" implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}" }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.13 2.8.4 org.eclipse.paho org.eclipse.paho.mqttv5.client 1.2.5 org.scala-lang scala-library 2.13.11 - Dependency tree
com.typesafe.akka akka-stream_2.13 2.8.4 BUSL-1.1 com.typesafe.akka akka-actor_2.13 2.8.4 BUSL-1.1 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang.modules scala-java8-compat_2.13 1.0.0 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 com.typesafe.akka akka-protobuf-v3_2.13 2.8.4 BUSL-1.1 com.typesafe ssl-config-core_2.13 0.6.1 Apache-2.0 com.typesafe config 1.4.2 Apache-2.0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.reactivestreams reactive-streams 1.0.4 MIT-0 org.scala-lang scala-library 2.13.11 Apache-2.0 org.eclipse.paho org.eclipse.paho.mqttv5.client 1.2.5 org.scala-lang scala-library 2.13.11 Apache-2.0
Settings
The required MqttConnectionSettings
(API
) settings to connect to an MQTT server are
- the MQTT broker address
- a unique ID for the client (setting it to the empty string should let the MQTT broker assign it, but not all do; you might want to generate it)
- the MQTT client persistence to use (e.g.
org.eclipse.paho.client.mqttv5.persist.MemoryPersistence
) which allows to control reliability guarantees
- Scala
-
source
import akka.stream.alpakka.mqttv5._ import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence val connectionSettings = MqttConnectionSettings( "tcp://localhost:1883", // (1) "test-scala-client", // (2) new MemoryPersistence // (3) )
- Java
-
source
import akka.stream.alpakka.mqttv5.*; import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence; MqttConnectionSettings connectionSettings = MqttConnectionSettings.create( "tcp://localhost:1883", // (1) "test-java-client", // (2) new MemoryPersistence() // (3) );
Most settings are passed on to Paho’s org.eclipse.paho.client.mqttv5.MqttConnectOptions
.
Note that the following examples do not provide any connection management and are designed to get you going quickly. Consider empty client IDs to auto-generate unique identifiers and the use of delayed stream restarts. The underlying Paho library’s auto-reconnect feature does not handle initial connections by design.
Configure encrypted connections
To connect with transport-level security configure the address as ssl://
, set authentication details and pass in a socket factory.
- Scala
-
source
val connectionSettings = MqttConnectionSettings("ssl://localhost:1885", "ssl-client", new MemoryPersistence) .withAuth("mqttUser", ByteString("mqttPassword")) .withSocketFactory(SSLContext.getDefault.getSocketFactory)
- Java
-
source
MqttConnectionSettings connectionSettings = MqttConnectionSettings.create("ssl://localhost:1885", "ssl-client", new MemoryPersistence()) .withAuth("mqttUser", ByteString.fromString("mqttPassword")) .withSocketFactory(SSLContext.getDefault().getSocketFactory());
Reading from MQTT
At most once
Then let’s create a source that connects to the MQTT server and receives messages from the subscribed topics.
The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies.
- Scala
-
source
val mqttSource: Source[MqttMessage, Future[Done]] = MqttSource.atMostOnce( connectionSettings.withClientId(clientId = "source-spec/source"), MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)), bufferSize = 8) val (subscribed, received) = mqttSource.take(messages.size).toMat(Sink.seq)(Keep.both).run()
- Java
-
source
MqttSubscriptions subscriptions = MqttSubscriptions.create(topic1, MqttQoS.atMostOnce()) .addSubscription(topic2, MqttQoS.atMostOnce()); Source<MqttMessage, CompletionStage<Done>> mqttSource = MqttSource.atMostOnce( connectionSettings.withClientId("source-test/source"), subscriptions, bufferSize); Pair<CompletionStage<Done>, CompletionStage<List<String>>> materialized = mqttSource .map(m -> m.topic() + "-" + m.payload().utf8String()) .take(messageCount * 2) .toMat(Sink.seq(), Keep.both()) .run(system); CompletionStage<Done> subscribed = materialized.first(); CompletionStage<List<String>> streamResult = materialized.second();
This source has a materialized value (Future[Done]
CompletionStage<Done>
) which is completed when the subscription to the MQTT broker has been established.
MQTT atMostOnce
automatically acknowledges messages back to the server when they are passed downstream.
At least once
The atLeastOnce
source allow users to acknowledge the messages anywhere downstream. Please note that for manual acks to work CleanStart
should be set to false
and MqttQoS
should be AtLeastOnce
.
The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies.
- Scala
-
source
val mqttSource: Source[MqttMessageWithAck, Future[Done]] = MqttSource.atLeastOnce( connectionSettings.withCleanStart(false), MqttSubscriptions(topic2, MqttQoS.AtLeastOnce), bufferSize = 8)
- Java
-
source
Source<MqttMessageWithAck, CompletionStage<Done>> mqttSource = MqttSource.atLeastOnce( connectionSettings .withClientId("source-test/source-withoutAutoAck") .withCleanStart(false), MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()), bufferSize);
The atLeastOnce
source returns MqttMessageWithAck
MqttMessageWithAck
so you can acknowledge them by calling ack()
.
- Scala
-
source
val (subscribed, received) = mqttSource .via(businessLogic) .mapAsync(1)(messageWithAck => messageWithAck.ack().map(_ => messageWithAck.message)) .toMat(Sink.seq)(Keep.both) .run()
- Java
-
source
final CompletionStage<List<MqttMessage>> result = mqttSource .via(businessLogic) .mapAsync( 1, messageWithAck -> messageWithAck.ack().thenApply(unused2 -> messageWithAck.message())) .take(input.size()) .runWith(Sink.seq(), system);
Publishing to MQTT
To publish messages to the MQTT server create a sink be specifying MqttConnectionSettings
(API
) and a default Quality of Service-level.
- Scala
-
source
val sink: Sink[MqttMessage, Future[Done]] = MqttSink(connectionSettings, MqttQoS.AtLeastOnce) Source(messages) .runWith(sink)
- Java
-
source
Sink<MqttMessage, CompletionStage<Done>> mqttSink = MqttSink.create(connectionSettings.withClientId("source-test/sink"), MqttQoS.atLeastOnce()); Source.from(messages).runWith(mqttSink, system);
The Quality of Service-level and the retained flag can be configured on a per-message basis.
- Scala
-
source
val lastWill = MqttMessage(willTopic, ByteString("ohi")).withQos(MqttQoS.AtLeastOnce).withRetained(true)
- Java
-
source
MqttMessage lastWill = MqttMessage.create(willTopic, ByteString.fromString("ohi")) .withQos(MqttQoS.atLeastOnce()) .withRetained(true);
Publishing with a pass-through
It is additionally possible to create a publishing-only flow. The flow will emit a “pass-through” element for each successful publish; the pass-through could, for instance, be a Kafka offset to be committed or an ActorRef
to which an acknowledgement should be sent after successfully publishing. To associate a pass-through element (for this example, the offset
) with a message to be published, the message and pass-through are wrapped in a PublishWithPassThrough
PublishWithPassThrough
:
- Scala
-
source
import akka.stream.alpakka.mqttv5.scaladsl.PublishWithPassThrough PublishWithPassThrough(mqttMessage, offset)
- Java
-
source
PublishWithPassThrough<CommittableOffset> toPublish = PublishWithPassThrough.create(message, offset);
Publishing-only flows with pass-through support are available through MqttPublisher
factories in MqttPublisher
. The flows support, optionally, providing a default Quality of Service level; if no default is provided, all messages must have a specific Quality of Service set (or the flow will fail). The flows also require declaring a publishing concurrency
: they will backpressure if there are that many messages pending.
For messages where the effective (explicitly set per-message or if not explicity set and a default has been provided) Quality of Service is AtMostOnce
atMostOnce()
, the pass-through will not be emitted before the underlying Paho client reports that it has sent the message over the network. An effective Quality of Service of AtLeastOnce
atLeastOnce()
will result in no pass-through being emitted before the Paho client reports that the server has accepted the message. The ExactlyOnce
exactlyOnce()
Quality of Service will result in no pass-through until the server has acknowledged our acknowledgement of its acceptance of the message.
The orderedFlow
will preserve the ordering of incoming messages in its emissions. This makes it suitable for pass-throughs which convey a “high-water-mark” in processing, such as Kafka or Akka Persistence offsets or Event Hub checkpoints. This flow is susceptible to head-of-line-blocking if for some reason an earlier message takes longer than later messages and in situations where a message fails to publish successfully, some pass-throughs from messages that did in fact successfully publish might not be seen by the downstream. It is therefore especially suited to applications which are built from the ground-up on at-least-once processing.
- Scala
-
source
import akka.stream.alpakka.mqttv5.scaladsl.MqttPublisher MqttPublisher.orderedFlow[CommittableOffset](connectionSettings, Some(MqttQoS.AtMostOnce), 10)
- Java
-
source
Flow<PublishWithPassThrough<CommittableOffset>, CommittableOffset, MqttControl> publishingFlow = MqttPublisher.orderedFlow(connectionSettings, MqttQoS.atMostOnce(), 10);
The unorderedFlow
, conversely does not promise to preserve order, which means it is not susceptible to head-of-line-blocking. If the pass-through elements can be considered completely independently, as might be the case if the pass-through is an ActorRef
and a message to send to that actor, this may be the most suitable choice.
- Scala
-
source
import akka.stream.alpakka.mqttv5.scaladsl.MqttPublisher MqttPublisher.unorderedFlow[CommittableOffset](connectionSettings, Some(MqttQoS.AtLeastOnce), 10)
- Java
-
source
Flow<PublishWithPassThrough<CommittableOffset>, CommittableOffset, MqttControl> publishingFlow = MqttPublisher.unorderedFlow(connectionSettings, MqttQoS.atLeastOnce(), 10);
Akka Streams has support for context-propagation alongside stream elements in streams that may drop, but cannot reorder elements. Taking advantage of this support is possible with the “withContext
” flows. For example, in situations where the only message-specific information needed downstream is contained in the context (e.g. a Kafka or projection offset from a SourceWithContext
), the doneFlowWithContext
will emit a Done
as pass-through for every message while propagating the context.
- Scala
-
source
import akka.stream.alpakka.mqttv5.scaladsl.MqttPublisher MqttPublisher.doneFlowWithContext[CommittableOffset](connectionSettings, Some(MqttQoS.AtLeastOnce), 10)
- Java
-
source
FlowWithContext<MqttMessage, CommittableOffset, Done, CommittableOffset, MqttControl> publishingFlow = MqttPublisher.doneFlowWithContext(connectionSettings, MqttQoS.atLeastOnce(), 10);
Because FlowWithContext
is not allowed to reorder elements, the withContext
variants always have the semantics of the orderedFlow
.
The publishing-only flows have an MqttControl
MqttControl
as their materialized value. The control object which is materialized allows one to request that the flow drain()
before it shuts down. When draining, the flow will cease demanding new messages to publish but will wait for previously demanded messages to publish (or fail to publish). The Future
CompletionStage
returned from drain()
will complete when the flow starts to drain, not when the flow eventually completes.
- Scala
-
source
import system.dispatcher val control: MqttControl = source .map { incomingMessage => PublishWithPassThrough(incomingMessage.messageToPublish, incomingMessage.offset) } .viaMat(MqttPublisher.orderedFlow(connectionSettings, None, 10))(Keep.right) .wireTap { offset => system.log.info("Successfully published to MQTT based on offset {}", offset) } .to(offsetCommittingSink) .run() val drainingStarted = control.drain() drainingStarted.foreach { _ => system.log.info("Stream is draining") }
- Java
-
source
MqttControl control = source .map(incomingMessage -> PublishWithPassThrough.create( incomingMessage.getMessageToPublish(), incomingMessage.getOffset())) .viaMat(MqttPublisher.orderedFlow(connectionSettings, 10), Keep.right()) .wireTap( Sink.foreach(offset -> log.info("Successfully published to MQTT based on offset {}", offset))) .to(offsetCommittingSink) .run(system); CompletionStage<Done> drainingStarted = control.drain(); drainingStarted.thenAcceptAsync(done -> log.info("Stream is draining"), system.dispatcher());
It is also possible to call shutdown()
on the control. This will cause the flow to cease demand and complete as soon as possible by ignoring whether in-flight publishing attempts succeed or fail. The Future
CompletionStage
returned will complete when the flow disconnects from the MQTT server.
Publish and subscribe in a single flow
It is also possible to connect to the MQTT server in bidirectional fashion, using a single underlying connection (and client ID). To do that create an MQTT flow that combines the functionalities of an MQTT source and an MQTT sink.
The bufferSize
sets the maximum number of messages read from MQTT before back-pressure applies.
- Scala
-
source
val mqttFlow: Flow[MqttMessage, MqttMessage, Future[Done]] = MqttFlow.atMostOnce( connectionSettings.withClientId("flow-spec/flow"), MqttSubscriptions(topic, MqttQoS.AtLeastOnce), bufferSize = 8, MqttQoS.AtLeastOnce)
- Java
-
source
final Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow = MqttFlow.atMostOnce( connectionSettings, MqttSubscriptions.create("flow-test/topic", MqttQoS.atMostOnce()), bufferSize, MqttQoS.atLeastOnce());
Run the flow by connecting a source of messages to be published and a sink for received messages.
- Scala
-
source
val ((mqttMessagePromise, subscribed), result) = source.viaMat(mqttFlow)(Keep.both).toMat(Sink.seq)(Keep.both).run()
- Java
-
source
final Pair< Pair<CompletableFuture<Optional<MqttMessage>>, CompletionStage<Done>>, CompletionStage<List<MqttMessage>>> materialized = source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system); CompletableFuture<Optional<MqttMessage>> mqttMessagePromise = materialized.first().first(); CompletionStage<Done> subscribedToMqtt = materialized.first().second(); CompletionStage<List<MqttMessage>> streamResult = materialized.second();
Using flow with Acknowledge on message sent
It is possible to create a flow that receives MqttMessageWithAck
instead of MqttMessage
. In this case, when the message is successfully sent to the broker, an ack is sent. This flow can be used when the source must be acknowledged only when the message is successfully sent to the destination topic. This provides at-least-once semantics.
The flow emits MqttMessageWithAck
s with the message swapped with the new content and keeps the ack function from the original source.
- Scala
-
source
val mqttFlow: Flow[MqttMessageWithAck, MqttMessageWithAck, Future[Done]] = MqttFlow.atLeastOnceWithAck( connectionSettings, MqttSubscriptions(topic, MqttQoS.AtLeastOnce), bufferSize = 8, MqttQoS.AtLeastOnce)
- Java
-
source
final Flow<MqttMessageWithAck, MqttMessageWithAck, CompletionStage<Done>> mqttFlow = MqttFlow.atLeastOnceWithAck( connectionSettings, MqttSubscriptions.create("flow-test/topic-ack", MqttQoS.atMostOnce()), bufferSize, MqttQoS.atLeastOnce());
Run the flow by connecting a source of messages to be published and a sink for received messages. When the message are sent, an ack is called.
- Scala
-
source
val (subscribed, result) = source.viaMat(mqttFlow)(Keep.right).toMat(Sink.seq)(Keep.both).run()
- Java
-
source
final Pair<Pair<NotUsed, CompletionStage<Done>>, CompletionStage<List<MqttMessageWithAck>>> materialized = source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(system);
Capturing MQTT client logging
The Paho library uses its own logging adapter and contains a default implementation to use java.util.logging
. See Paho/Log and Debug.