MQTT Streaming

MQTT Streaming

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. The design principles are to minimize network bandwidth and device resource requirements whilst also attempting to ensure reliability and some degree of assurance of delivery. These principles also turn out to make the protocol ideal of the emerging “machine-to-machine” (M2M) or “Internet of Things” world of connected devices, and for mobile applications where bandwidth and battery power are at a premium.

Further information on mqtt.org.

Paho Differences

Alpakka contains another MQTT connector which is based on the Eclipse Paho client. Unlike the Paho version, this library has no dependencies other than those of Akka Streams i.e. it is entirely reactive. As such, there should be a significant performance advantage given its pure-Akka foundations in terms of memory usage given its diligent use of threads.

This library also differs in that it separates out the concern of how MQTT is connected. Unlike Paho, where TCP is assumed, this library can join in any flow. The end result is that by using this library, Unix Domain Sockets, TCP, UDP or anything else can be used to transport MQTT.

The Alpakka MQTT connector provides an Akka Stream flow to connect to MQTT brokers. In addition, a flow is provided so that you can implement your own MQTT server in the case where you do not wish to use a broker–MQTT is a fine protocol for directed client/server interactions, as well as having an intermediary broker.

Project Info: Alpakka MQTT Streaming
Artifact
com.lightbend.akka
akka-stream-alpakka-mqtt-streaming
1.1.2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.13.0
JPMS module nameakka.stream.alpakka.mqttStreaming
License
Readiness level
Since 1.0-M2, 2019-01-17
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-mqtt-streaming" % "1.1.2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-mqtt-streaming_2.12</artifactId>
  <version>1.1.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-mqtt-streaming_2.12', version: '1.1.2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-actor-typed_2.122.5.23Apache License, Version 2.0
com.typesafe.akkaakka-stream-typed_2.122.5.23Apache License, Version 2.0
com.typesafe.akkaakka-stream_2.122.5.23Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-actor-typed_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-stream-typed_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor-typed_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
            com.typesafe    config    1.3.3    Apache License, Version 2.0
            org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
                org.scala-lang    scala-library    2.12.7    BSD 3-Clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.reactivestreams    reactive-streams    1.0.2    CC0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Flow through a client session

The following code illustrates how to establish an MQTT client session and join it with a TCP connection:

Scala
val settings = MqttSessionSettings()
val session = ActorMqttClientSession(settings)

val connection = Tcp().outgoingConnection("localhost", 1883)

val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
  Mqtt
    .clientSessionFlow(session, ByteString("1"))
    .join(connection)
Java
MqttSessionSettings settings = MqttSessionSettings.create();
MqttClientSession session = ActorMqttClientSession.create(settings, materializer, system);

Flow<ByteString, ByteString, CompletionStage<Tcp.OutgoingConnection>> connection =
    Tcp.get(system).outgoingConnection("localhost", 1883);

Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
    Mqtt.clientSessionFlow(session, ByteString.fromString("1")).join(connection);

The resulting flow’s type shows how Commands are received and Events are emitted. With Event, they can be either decoded successfully or not.

Run the flow by connecting a source of messages to be published via a queue:

Scala
val (commands: SourceQueueWithComplete[Command[Nothing]], events: Future[Publish]) =
  Source
    .queue(2, OverflowStrategy.fail)
    .via(mqttFlow)
    .collect {
      case Right(Event(p: Publish, _)) => p
    }
    .toMat(Sink.head)(Keep.both)
    .run()

commands.offer(Command(Connect(clientId, ConnectFlags.CleanSession)))
commands.offer(Command(Subscribe(topic)))
session ! Command(
  Publish(ControlPacketFlags.RETAIN | ControlPacketFlags.QoSAtLeastOnceDelivery, topic, ByteString("ohi"))
)

// for shutting down properly
commands.complete()
commands.watchCompletion().foreach(_ => session.shutdown())
Java
Pair<SourceQueueWithComplete<Command<Object>>, CompletionStage<Publish>> run =
    Source.<Command<Object>>queue(3, OverflowStrategy.fail())
        .via(mqttFlow)
        .collect(
            new JavaPartialFunction<DecodeErrorOrEvent<Object>, Publish>() {
              @Override
              public Publish apply(DecodeErrorOrEvent<Object> x, boolean isCheck) {
                if (x.getEvent().isPresent() && x.getEvent().get().event() instanceof Publish)
                  return (Publish) x.getEvent().get().event();
                else throw noMatch();
              }
            })
        .toMat(Sink.head(), Keep.both())
        .run(materializer);

SourceQueueWithComplete<Command<Object>> commands = run.first();
commands.offer(new Command<>(new Connect(clientId, ConnectFlags.CleanSession())));
commands.offer(new Command<>(new Subscribe(topic)));
session.tell(
    new Command<>(
        new Publish(
            ControlPacketFlags.RETAIN() | ControlPacketFlags.QoSAtLeastOnceDelivery(),
            topic,
            ByteString.fromString("ohi"))));

// for shutting down properly
commands.complete();
commands.watchCompletion().thenAccept(done -> session.shutdown());

Note that the Publish command is not offered to the command flow given MQTT QoS requirements. Instead, the session is told to perform Publish given that it can retry continuously with buffering until a command flow is established.

We filter the events received as there will be ACKs to our connect, subscribe and publish. The collected event is the publication to the topic we just subscribed to.

To shut down the flow after use, the command queue commands is completed and after its completion the session is shut down.

Flow through a server session

The following code illustrates how to establish an MQTT server session and join it with a TCP binding:

Scala
val settings = MqttSessionSettings()
val session = ActorMqttServerSession(settings)

val maxConnections = 1

val bindSource: Source[Either[MqttCodec.DecodeError, Event[Nothing]], Future[Tcp.ServerBinding]] =
  Tcp()
    .bind(host, 0)
    .flatMapMerge(
      maxConnections, { connection =>
        val mqttFlow: Flow[Command[Nothing], Either[MqttCodec.DecodeError, Event[Nothing]], NotUsed] =
          Mqtt
            .serverSessionFlow(session, ByteString(connection.remoteAddress.getAddress.getAddress))
            .join(connection.flow)

        val (queue, source) = Source
          .queue[Command[Nothing]](3, OverflowStrategy.dropHead)
          .via(mqttFlow)
          .toMat(BroadcastHub.sink)(Keep.both)
          .run()

        val subscribed = Promise[Done]
        source
          .runForeach {
            case Right(Event(_: Connect, _)) =>
              queue.offer(Command(ConnAck(ConnAckFlags.None, ConnAckReturnCode.ConnectionAccepted)))
            case Right(Event(cp: Subscribe, _)) =>
              queue.offer(Command(SubAck(cp.packetId, cp.topicFilters.map(_._2)), Some(subscribed), None))
            case Right(Event(publish @ Publish(flags, _, Some(packetId), _), _))
                if flags.contains(ControlPacketFlags.RETAIN) =>
              queue.offer(Command(PubAck(packetId)))
              import mat.executionContext
              subscribed.future.foreach(_ => session ! Command(publish))
            case _ => // Ignore everything else
          }

        source
      }
    )
Java
MqttSessionSettings settings = MqttSessionSettings.create();
MqttServerSession session = ActorMqttServerSession.create(settings, materializer, system);

int maxConnections = 1;

Source<DecodeErrorOrEvent<Object>, CompletionStage<Tcp.ServerBinding>> bindSource =
    Tcp.get(system)
        .bind(host, port)
        .flatMapMerge(
            maxConnections,
            connection -> {
              Flow<Command<Object>, DecodeErrorOrEvent<Object>, NotUsed> mqttFlow =
                  Mqtt.serverSessionFlow(
                          session,
                          ByteString.fromArray(
                              connection.remoteAddress().getAddress().getAddress()))
                      .join(connection.flow());

              Pair<
                      SourceQueueWithComplete<Command<Object>>,
                      Source<DecodeErrorOrEvent<Object>, NotUsed>>
                  run =
                      Source.<Command<Object>>queue(2, OverflowStrategy.dropHead())
                          .via(mqttFlow)
                          .toMat(BroadcastHub.of(DecodeErrorOrEvent.classOf()), Keep.both())
                          .run(materializer);

              SourceQueueWithComplete<Command<Object>> queue = run.first();
              Source<DecodeErrorOrEvent<Object>, NotUsed> source = run.second();

              CompletableFuture<Done> subscribed = new CompletableFuture<>();
              source.runForeach(
                  deOrE -> {
                    if (deOrE.getEvent().isPresent()) {
                      Event<Object> event = deOrE.getEvent().get();
                      ControlPacket cp = event.event();
                      if (cp instanceof Connect) {
                        queue.offer(
                            new Command<>(
                                new ConnAck(
                                    ConnAckFlags.None(),
                                    ConnAckReturnCode.ConnectionAccepted())));
                      } else if (cp instanceof Subscribe) {
                        Subscribe subscribe = (Subscribe) cp;
                        Collection<Tuple2<String, ControlPacketFlags>> topicFilters =
                            JavaConverters.asJavaCollectionConverter(subscribe.topicFilters())
                                .asJavaCollection();
                        List<Integer> flags =
                            topicFilters.stream()
                                .map(x -> x._2().underlying())
                                .collect(Collectors.toList());
                        queue.offer(
                            new Command<>(
                                new SubAck(subscribe.packetId(), flags),
                                Optional.of(subscribed),
                                Optional.empty()));
                      } else if (cp instanceof Publish) {
                        Publish publish = (Publish) cp;
                        if ((publish.flags() & ControlPacketFlags.RETAIN()) != 0) {
                          int packetId = publish.packetId().get().underlying();
                          queue.offer(new Command<>(new PubAck(packetId)));
                          subscribed.thenRun(() -> session.tell(new Command<>(publish)));
                        }
                      } // Ignore everything else
                    }
                  },
                  materializer);

              return source;
            });

The resulting source’s type shows how Events are received and Commands are queued in reply. Our example acknowledges a connection, subscription and publication. Upon receiving a publication, it is re-published from the server so that any client that is subscribed will receive it. An additional detail is that we hold off re-publishing until we have a subscription from the client. Note also how the session is told to perform Publish commands directly as they will be broadcasted to all clients subscribed to the topic.

Run the flow:

Scala
val (bound: Future[Tcp.ServerBinding], server: UniqueKillSwitch) = bindSource
  .viaMat(KillSwitches.single)(Keep.both)
  .to(Sink.ignore)
  .run()

// for shutting down properly
server.shutdown()
session.shutdown()
Java
Pair<CompletionStage<Tcp.ServerBinding>, UniqueKillSwitch> bindingAndSwitch =
    bindSource.viaMat(KillSwitches.single(), Keep.both()).to(Sink.ignore()).run(materializer);

CompletionStage<Tcp.ServerBinding> bound = bindingAndSwitch.first();
UniqueKillSwitch server = bindingAndSwitch.second();

// for shutting down properly
server.shutdown();
commands.watchCompletion().thenAccept(done -> session.shutdown());

To shut down the server after use, the server flow is shut down via a KillSwitch and the session is shut down.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.