MQTT

MQTT

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimise network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Further information on mqtt.org.

Streaming Differences

Alpakka contains another MQTT connector which is not based on the Eclipse Paho client, unlike this one. Please refer to the other connector where the differences are expanded on.

The Alpakka MQTT connector provides an Akka Stream source, sink and flow to connect to MQTT brokers. It is based on the Eclipse Paho Java client.

Project Info: Alpakka MQTT
Artifact
com.lightbend.akka
akka-stream-alpakka-mqtt
1.0-M2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12
JPMS module nameakka.stream.alpakka.mqtt
License
Readiness level
Community-driven
Since 0.1, 2016-11-10
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mqtt" % "1.0-M2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-mqtt_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mqtt_2.12', version: '1.0-M2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.19Apache License, Version 2.0
org.eclipse.pahoorg.eclipse.paho.client.mqttv31.2.0Eclipse Public License - Version 1.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.19    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.19    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.19    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.6    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.eclipse.paho    org.eclipse.paho.client.mqttv3    1.2.0    Eclipse Public License - Version 1.0
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Settings

The required MqttConnectionSettings (API) settings to connect to an MQTT server are

  1. the MQTT broker address
  2. a unique ID for the client (setting it to the empty string should let the MQTT broker assign it, but not all do; you might want to generate it)
  3. the MQTT client persistence to use (eg. MemoryPersistence) which allows to control reliability guarantees
Scala
val connectionSettings = MqttConnectionSettings(
  "tcp://localhost:1883", // (1)
  "test-scala-client", // (2)
  new MemoryPersistence // (3)
)
Java
MqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create(
        "tcp://localhost:1883", // (1)
        "test-java-client", // (2)
        new MemoryPersistence() // (3)
        );

Most settings are passed on to Paho’s MqttConnectOptions (API) and documented there.

Use delayed stream restarts

Note that the following examples do not provide any connection management and are designed to get you going quickly. Consider empty client IDs to auto-generate unique identifiers and the use of delayed stream restarts. The underlying Paho library’s auto-reconnect feature does not handle initial connections by design.

Configure encrypted connections

To connect with transport-level security configure the address as ssl://, set authentication details and pass in a socket factory.

Scala
val connectionSettings = MqttConnectionSettings(
  "ssl://localhost:1885",
  "ssl-client",
  new MemoryPersistence
).withAuth("mqttUser", "mqttPassword")
  .withSocketFactory(SSLContext.getDefault.getSocketFactory)
Java
MqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create("ssl://localhost:1885", "ssl-client", new MemoryPersistence())
        .withAuth("mqttUser", "mqttPassword")
        .withSocketFactory(SSLContext.getDefault().getSocketFactory());

Reading from MQTT

At most once

Then let’s create a source that connects to the MQTT server and receives messages from the subscribed topics.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
val mqttSource: Source[MqttMessage, Future[Done]] =
  MqttSource.atMostOnce(
    connectionSettings.withClientId(clientId = "source-spec/source"),
    MqttSubscriptions(Map(topic1 -> MqttQoS.AtLeastOnce, topic2 -> MqttQoS.AtLeastOnce)),
    bufferSize = 8
  )

val (subscribed, streamResult) = mqttSource
  .take(messages.size)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
MqttSubscriptions subscriptions =
    MqttSubscriptions.create(topic1, MqttQoS.atMostOnce())
        .addSubscription(topic2, MqttQoS.atMostOnce());

Source<MqttMessage, CompletionStage<Done>> mqttSource =
    MqttSource.atMostOnce(
        connectionSettings.withClientId("source-test/source"), subscriptions, bufferSize);

Pair<CompletionStage<Done>, CompletionStage<List<String>>> materialized =
    mqttSource
        .map(m -> m.topic() + "-" + m.payload().utf8String())
        .take(messageCount * 2)
        .toMat(Sink.seq(), Keep.both())
        .run(materializer);

CompletionStage<Done> subscribed = materialized.first();
CompletionStage<List<String>> streamResult = materialized.second();

This source has a materialized value (Future[Done]CompletionStage<Done>) which is completed when the subscription to the MQTT broker has been established.

MQTT atMostOnce automatically acknowledges messages back to the server when they are passed downstream.

At least once

The atLeastOnce source allow users to acknowledge the messages anywhere downstream. Please note that for manual acks to work CleanSession should be set to false and MqttQoS should be AtLeastOnce.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
val mqttSource: Source[MqttMessageWithAck, Future[Done]] =
  MqttSource.atLeastOnce(
    connectionSettings
      .withClientId(clientId = "source-spec/source1")
      .withCleanSession(false),
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8
  )
Java
Source<MqttMessageWithAck, CompletionStage<Done>> mqttSource =
    MqttSource.atLeastOnce(
        connectionSettings
            .withClientId("source-test/source-withoutAutoAck")
            .withCleanSession(false),
        MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()),
        bufferSize);

The atLeastOnce source returns MqttMessageWithAckMqttMessageWithAck so you can acknowledge them by calling ack().

Scala
val result = mqttSource
  .via(businessLogic)
  .mapAsync(1)(messageWithAck => messageWithAck.ack().map(_ => messageWithAck.message))
  .take(input.size)
  .runWith(Sink.seq)
Java
final CompletionStage<List<MqttMessage>> result =
    mqttSource
        .via(businessLogic)
        .mapAsync(
            1,
            messageWithAck ->
                messageWithAck.ack().thenApply(unused2 -> messageWithAck.message()))
        .take(input.size())
        .runWith(Sink.seq(), materializer);

Publishing to MQTT

To publish messages to the MQTT server create a sink be specifying MqttConnectionSettings (API) and a default Quality of Service-level.

Scala
val sink: Sink[MqttMessage, Future[Done]] =
  MqttSink(connectionSettings, MqttQoS.AtLeastOnce)
Source(messages).runWith(sink)
Java
Sink<MqttMessage, CompletionStage<Done>> mqttSink =
    MqttSink.create(connectionSettings.withClientId("source-test/sink"), MqttQoS.atLeastOnce());
Source.from(messages).runWith(mqttSink, materializer);

The Quality of Service-level and the retained flag can be configured on a per-message basis.

Scala
val lastWill = MqttMessage(willTopic, ByteString("ohi"))
  .withQos(MqttQoS.AtLeastOnce)
  .withRetained(true)
Java
MqttMessage lastWill =
    MqttMessage.create(willTopic, ByteString.fromString("ohi"))
        .withQos(MqttQoS.atLeastOnce())
        .withRetained(true);

Publish and subscribe in a single flow

It is also possible to connect to the MQTT server in bidirectional fashion, using a single underlying connection (and client ID). To do that create an MQTT flow that combines the functionalities of an MQTT source and an MQTT sink.

The bufferSize sets the maximum number of messages read from MQTT before back-pressure applies.

Scala
val mqttFlow: Flow[MqttMessage, MqttMessage, Future[Done]] =
  MqttFlow.atMostOnce(
    connectionSettings.withClientId("flow-spec/flow"),
    MqttSubscriptions(topic, MqttQoS.AtLeastOnce),
    bufferSize = 8,
    MqttQoS.AtLeastOnce
  )
Java
final Flow<MqttMessage, MqttMessage, CompletionStage<Done>> mqttFlow =
    MqttFlow.atMostOnce(
        connectionSettings,
        MqttSubscriptions.create("flow-test/topic", MqttQoS.atMostOnce()),
        bufferSize,
        MqttQoS.atLeastOnce());

Run the flow by connecting a source of messages to be published and a sink for received messages.

Scala
val ((mqttMessagePromise, subscribed), result) = source
  .viaMat(mqttFlow)(Keep.both)
  .toMat(Sink.seq)(Keep.both)
  .run()
Java
final Pair<
        Pair<CompletableFuture<Optional<MqttMessage>>, CompletionStage<Done>>,
        CompletionStage<List<MqttMessage>>>
    materialized =
        source.viaMat(mqttFlow, Keep.both()).toMat(Sink.seq(), Keep.both()).run(materializer);

CompletableFuture<Optional<MqttMessage>> mqttMessagePromise = materialized.first().first();
CompletionStage<Done> subscribedToMqtt = materialized.first().second();
CompletionStage<List<MqttMessage>> streamResult = materialized.second();

Capturing MQTT client logging

The Paho library uses its own logging adapter and contains a default implementation to use java.util.logging. See Paho/Log and Debug.

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Test code requires a MQTT server running in the background. You can start one quickly using docker:

docker-compose up mqtt

Scala
sbt
> mqtt/testOnly *.MqttSourceSpec
Java
sbt
> mqtt/testOnly *.MqttSourceTest
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.