MQTT

Example: Read from an MQTT topic, group messages and publish to Kafka

  • (1) connection details to MQTT broker
  • (2) settings for MQTT source specifying the topic to listen to
  • (3) use helper method to cater for Paho failures on initial connect
  • (4) add a kill switch to allow for stopping the subscription
  • (5) convert incoming ByteString to String
  • (6) parse JSON
  • (7) group up to 50 messages into one, as long as they appear with 5 seconds
  • (8) convert the list of measurements to a JSON array structure
Java
final MqttConnectionSettings connectionSettings =
    MqttConnectionSettings.create(
        "tcp://localhost:1883", // (1)
        "coffee-client",
        new MemoryPersistence());

final String topic = "coffee/level";

MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()); // (2)

Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
    wrapWithAsRestartSource( // (3)
        () ->
            MqttSource.atMostOnce(
                connectionSettings.withClientId("coffee-control"), subscriptions, 8));

Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> completions =
    restartingMqttSource
        .viaMat(KillSwitches.single(), Keep.both()) // (4)
        .map(m -> m.payload().utf8String()) // (5)
        .map(measurementReader::readValue) // (6)
        .groupedWithin(50, Duration.ofSeconds(5)) // (7)
        .map(list -> asJsonArray("measurements", list)) // (8)
        .toMat(Sink.foreach(System.out::println), Keep.both())
        .run(materializer);

Restarting of the source

The MQTT source gets wrapped by a RestartSource to mitigate the Paho initial connections problem.

Java
/** Wrap a source with restart logic and exposes an equivalent materialized value. */
<M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource(
    Creator<Source<M, CompletionStage<Done>>> source) {
  // makes use of the fact that these sources materialize a CompletionStage<Done>
  CompletableFuture<Done> fut = new CompletableFuture<>();
  return RestartSource.withBackoff(
          Duration.ofMillis(100),
          Duration.ofSeconds(3),
          0.2d, // randomFactor
          5, // maxRestarts,
          () ->
              source
                  .create()
                  .mapMaterializedValue(
                      mat ->
                          mat.handle(
                              (done, exception) -> {
                                if (done != null) {
                                  fut.complete(done);
                                } else {
                                  fut.completeExceptionally(exception);
                                }
                                return fut.toCompletableFuture();
                              })))
      .mapMaterializedValue(ignore -> fut.toCompletableFuture());
}

Json helper code

To use Java 8 time types (Instant) with Jackson, extra dependencies are required.

sbt
libraryDependencies ++= Seq(
  "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.9.6",
  "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.9.6"
)
Maven
<dependency>
  <groupId>com.fasterxml.jackson.datatype</groupId>
  <artifactId>jackson-datatype-jdk8</artifactId>
  <version>2.9.6</version>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.datatype</groupId>
  <artifactId>jackson-datatype-jsr310</artifactId>
  <version>2.9.6</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jdk8', version: '2.9.6',
  compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.9.6'
}
Java

/** Data elements sent via MQTT broker. */ public static final class Measurement { public final Instant timestamp; public final long level; @JsonCreator public Measurement( @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") long level) { this.timestamp = timestamp; this.level = level; } } private final JsonFactory jsonFactory = new JsonFactory(); final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); final ObjectReader measurementReader = mapper.readerFor(Measurement.class); final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class); private String asJsonArray(String fieldName, List<Object> list) throws IOException { StringWriter sw = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(sw); generator.writeStartObject(); generator.writeFieldName(fieldName); measurementWriter.writeValues(generator).init(true).writeAll(list); generator.close(); return sw.toString(); }

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

sbt
sbt
> doc-examples/run
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.