MQTT
Example: Read from an MQTT topic, group messages and publish to Kafka
- (1) connection details to MQTT broker
- (2) settings for MQTT source specifying the topic to listen to
- (3) use helper method to cater for Paho failures on initial connect
- (4) add a kill switch to allow for stopping the subscription
- (5) convert incoming ByteString to String
- (6) parse JSON
- (7) group up to 50 messages into one, as long as they appear with 5 seconds
- (8) convert the list of measurements to a JSON array structure
- Java
-
final MqttConnectionSettings connectionSettings = MqttConnectionSettings.create( "tcp://localhost:1883", // (1) "coffee-client", new MemoryPersistence()); final String topic = "coffee/level"; MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()); // (2) Source<MqttMessage, CompletionStage<Done>> restartingMqttSource = wrapWithAsRestartSource( // (3) () -> MqttSource.atMostOnce( connectionSettings.withClientId("coffee-control"), subscriptions, 8)); Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> completions = restartingMqttSource .viaMat(KillSwitches.single(), Keep.both()) // (4) .map(m -> m.payload().utf8String()) // (5) .map(measurementReader::readValue) // (6) .groupedWithin(50, Duration.ofSeconds(5)) // (7) .map(list -> asJsonArray("measurements", list)) // (8) .toMat(Sink.foreach(System.out::println), Keep.both()) .run(materializer);
Restarting of the source
The MQTT source gets wrapped by a RestartSource
to mitigate the Paho initial connections problem.
- Java
-
/** Wrap a source with restart logic and exposes an equivalent materialized value. */ <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource( Creator<Source<M, CompletionStage<Done>>> source) { // makes use of the fact that these sources materialize a CompletionStage<Done> CompletableFuture<Done> fut = new CompletableFuture<>(); return RestartSource.withBackoff( Duration.ofMillis(100), Duration.ofSeconds(3), 0.2d, // randomFactor 5, // maxRestarts, () -> source .create() .mapMaterializedValue( mat -> mat.handle( (done, exception) -> { if (done != null) { fut.complete(done); } else { fut.completeExceptionally(exception); } return fut.toCompletableFuture(); }))) .mapMaterializedValue(ignore -> fut.toCompletableFuture()); }
Json helper code
To use Java 8 time types (Instant
) with Jackson, extra dependencies are required.
- sbt
libraryDependencies ++= Seq( "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.9.6", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.9.6" )
- Maven
<dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jdk8</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> <version>2.9.6</version> </dependency>
- Gradle
dependencies { compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jdk8', version: '2.9.6', compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.9.6' }
- Java
-
/** Data elements sent via MQTT broker. */ public static final class Measurement { public final Instant timestamp; public final long level; @JsonCreator public Measurement( @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") long level) { this.timestamp = timestamp; this.level = level; } } private final JsonFactory jsonFactory = new JsonFactory(); final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); final ObjectReader measurementReader = mapper.readerFor(Measurement.class); final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class); private String asJsonArray(String fieldName, List<Object> list) throws IOException { StringWriter sw = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(sw); generator.writeStartObject(); generator.writeFieldName(fieldName); measurementWriter.writeValues(generator).init(true).writeAll(list); generator.close(); return sw.toString(); }
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- sbt
-
sbt > doc-examples/run