Example code

This example uses an MQTT broker which may be started via Docker compose and a Kafka broker which is created from within the JVM by Testcontainers.

Starting the MQTT broker

> docker-compose up -d
docker-compose.yml
sourceversion: '2'
services:
  mqtt:
    image: toke/mosquitto
    ports:
      - "1883:1883"
    volumes:
      - ./src/main/travis:/mqtt/config/conf.d

Restarting of the source

The MQTT source gets wrapped by a RestartSource to mitigate the Paho initial connections problem.

Java
source
/** * Wrap a source with restart logic and exposes an equivalent materialized value. */ <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource( Creator<Source<M, CompletionStage<Done>>> source) { // makes use of the fact that these sources materialize a CompletionStage<Done> CompletableFuture<Done> fut = new CompletableFuture<>(); return RestartSource.withBackoff( Duration.ofMillis(100), Duration.ofSeconds(3), 0.2d, // randomFactor 5, // maxRestarts, () -> source .create() .mapMaterializedValue( mat -> mat.handle( (done, exception) -> { if (done != null) { fut.complete(done); } else { fut.completeExceptionally(exception); } return fut.toCompletableFuture(); }))) .mapMaterializedValue(ignore -> fut.toCompletableFuture()); }
Scala
source/**
 * Wrap a source with restart logic and expose an equivalent materialized value.
 */
private def wrapWithAsRestartSource[M](source: => Source[M, Future[Done]]): Source[M, Future[Done]] = {
  val fut = Promise[Done]
  RestartSource.withBackoff(100.millis, 3.seconds, randomFactor = 0.2d, maxRestarts = 5) {
    () => source.mapMaterializedValue(mat => fut.completeWith(mat))
  }.mapMaterializedValue(_ => fut.future)
}

Json helper code

To use Java 8 time types (Instant) with Jackson, extra dependencies are required.

sbt
libraryDependencies ++= Seq(
  "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.10.0",
  "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.10.0",
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.0"
)
Maven
<properties>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jdk8</artifactId>
    <version>2.10.0</version>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
    <version>2.10.0</version>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.module</groupId>
    <artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
    <version>2.10.0</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.0"
  implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.10.0"
  implementation "com.fasterxml.jackson.module:jackson-module-scala_${versions.ScalaBinary}:2.10.0"
}

Data class and JSON mapping

Java
source
/** * Data elements sent via MQTT broker. */ public static final class Measurement { public final Instant timestamp; public final long level; @JsonCreator public Measurement( @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") long level) { this.timestamp = timestamp; this.level = level; } } private final JsonFactory jsonFactory = new JsonFactory(); final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); final ObjectReader measurementReader = mapper.readerFor(Measurement.class); final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class); private String asJsonArray(String fieldName, List<Object> list) throws IOException { StringWriter sw = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(sw); generator.writeStartObject(); generator.writeFieldName(fieldName); measurementWriter.writeValues(generator).init(true).writeAll(list); generator.close(); return sw.toString(); }
Scala
source/**
 * Data elements sent via MQTT broker.
 */
final case class Measurement(timestamp: Instant, level: Long)

val jsonFactory = new JsonFactory
val mapper = new ObjectMapper()
  .registerModule(new JavaTimeModule)
  .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
val measurementReader = mapper.readerFor(classOf[Main.Measurement])
val measurementWriter = mapper.writerFor(classOf[Main.Measurement])

private def asJsonArray(fieldName: String, list: Seq[AnyRef]) = {
  val sw = new StringWriter
  val generator = jsonFactory.createGenerator(sw)
  generator.writeStartObject()
  generator.writeFieldName(fieldName)
  measurementWriter.writeValues(generator).init(true).writeAll(list.asJava)
  generator.close()
  sw.toString
}

Flow

  • (1) connection details to MQTT broker
  • (2) settings for MQTT source specifying the topic to listen to
  • (3) use helper method to cater for Paho failures on initial connect
  • (4) add a kill switch to allow for stopping the subscription
  • (5) convert incoming ByteString to String
  • (6) parse JSON
  • (7) group up to 50 messages into one, as long as they appear with 5 seconds
  • (8) convert the list of measurements to a JSON array structure
  • (9) store the JSON in a Kafka producer record
  • (10) producer to Kafka
Java
sourcefinal MqttConnectionSettings connectionSettings =
        MqttConnectionSettings.create(
                "tcp://localhost:1883", // (1)
                "coffee-client",
                new MemoryPersistence());

final String topic = "coffee/level";

MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()); // (2)

Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
        wrapWithAsRestartSource( // (3)
                () ->
                        MqttSource.atMostOnce(
                                connectionSettings.withClientId("coffee-control"), subscriptions, 8));

// Set up Kafka producer sink
ProducerSettings<String, String> producerSettings =
        ProducerSettings
                .create(toClassic(system), new StringSerializer(), new StringSerializer())
                .withBootstrapServers(kafkaServer);
Sink<ProducerRecord<String, String>, CompletionStage<Done>> kafkaProducer =
        Producer.plainSink(producerSettings);
String kafkaTopic = "measurements";

Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> completions =
        restartingMqttSource
                .viaMat(KillSwitches.single(), Keep.both()) // (4)
                .map(m -> m.payload().utf8String()) // (5)
                .map(measurementReader::readValue) // (6)
                .groupedWithin(50, Duration.ofSeconds(5)) // (7)
                .map(list -> asJsonArray("measurements", list)) // (8)
                .log("producing to Kafka", logAdapter)
                .map(json -> new ProducerRecord<>(kafkaTopic, "", json)) // (9)
                .toMat(kafkaProducer, Keep.both()) // (10)
                .run(system);
Scala
sourceval connectionSettings = MqttConnectionSettings("tcp://localhost:1883", "coffee-client", new MemoryPersistence) // (1)

val topic = "coffee/level"

val subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce) // (2)

val restartingMqttSource = wrapWithAsRestartSource( // (3)
  MqttSource.atMostOnce(connectionSettings.withClientId("coffee-control"), subscriptions, 8))

// Set up Kafka producer sink
val producerSettings = ProducerSettings(system.toClassic, new StringSerializer, new StringSerializer).withBootstrapServers(kafkaServer)
val kafkaProducer = Producer.plainSink(producerSettings)
val kafkaTopic = "measurements"

val ((subscriptionInitialized, listener), streamCompletion) = restartingMqttSource
  .viaMat(KillSwitches.single)(Keep.both) // (4)
  .map(_.payload.utf8String) // (5)
  .map(measurementReader.readValue) // (6)
  .groupedWithin(50, 5.seconds) // (7)
  .map(list => asJsonArray("measurements", list)) // (8)
  .log("producing to Kafka")
  .map(json => new ProducerRecord[String, String](kafkaTopic, "", json)) // (9)
  .toMat(kafkaProducer)(Keep.both) // (10)
  .run
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.