Example code
This example uses an MQTT broker which may be started via Docker compose and a Kafka broker which is created from within the JVM by Testcontainers.
Starting the MQTT broker
> docker-compose up -d
- docker-compose.yml
-
source
version: '2' services: mqtt: image: toke/mosquitto ports: - "1883:1883" volumes: - ./src/main/travis:/mqtt/config/conf.d
Restarting of the source
The MQTT source gets wrapped by a RestartSource
to mitigate the Paho initial connections problem.
- Java
-
source
/** * Wrap a source with restart logic and exposes an equivalent materialized value. */ <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource( Creator<Source<M, CompletionStage<Done>>> source) { // makes use of the fact that these sources materialize a CompletionStage<Done> CompletableFuture<Done> fut = new CompletableFuture<>(); return RestartSource.withBackoff( Duration.ofMillis(100), Duration.ofSeconds(3), 0.2d, // randomFactor 5, // maxRestarts, () -> source .create() .mapMaterializedValue( mat -> mat.handle( (done, exception) -> { if (done != null) { fut.complete(done); } else { fut.completeExceptionally(exception); } return fut.toCompletableFuture(); }))) .mapMaterializedValue(ignore -> fut.toCompletableFuture()); } - Scala
-
source
/** * Wrap a source with restart logic and expose an equivalent materialized value. */ private def wrapWithAsRestartSource[M](source: => Source[M, Future[Done]]): Source[M, Future[Done]] = { val fut = Promise[Done] RestartSource.withBackoff(100.millis, 3.seconds, randomFactor = 0.2d, maxRestarts = 5) { () => source.mapMaterializedValue(mat => fut.completeWith(mat)) }.mapMaterializedValue(_ => fut.future) }
Json helper code
To use Java 8 time types (Instant
) with Jackson, extra dependencies are required.
- sbt
libraryDependencies ++= Seq( "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.10.0", "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.10.0", "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.0" )
- Maven
<properties> <scala.binary.version>2.13</scala.binary.version> </properties> <dependencies> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jdk8</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.datatype</groupId> <artifactId>jackson-datatype-jsr310</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_${scala.binary.version}</artifactId> <version>2.10.0</version> </dependency> </dependencies>
- Gradle
def versions = [ ScalaBinary: "2.13" ] dependencies { implementation "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.0" implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.10.0" implementation "com.fasterxml.jackson.module:jackson-module-scala_${versions.ScalaBinary}:2.10.0" }
Data class and JSON mapping
- Java
-
source
/** * Data elements sent via MQTT broker. */ public static final class Measurement { public final Instant timestamp; public final long level; @JsonCreator public Measurement( @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") long level) { this.timestamp = timestamp; this.level = level; } } private final JsonFactory jsonFactory = new JsonFactory(); final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); final ObjectReader measurementReader = mapper.readerFor(Measurement.class); final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class); private String asJsonArray(String fieldName, List<Object> list) throws IOException { StringWriter sw = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(sw); generator.writeStartObject(); generator.writeFieldName(fieldName); measurementWriter.writeValues(generator).init(true).writeAll(list); generator.close(); return sw.toString(); } - Scala
-
source
/** * Data elements sent via MQTT broker. */ final case class Measurement(timestamp: Instant, level: Long) val jsonFactory = new JsonFactory val mapper = new ObjectMapper() .registerModule(new JavaTimeModule) .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) val measurementReader = mapper.readerFor(classOf[Main.Measurement]) val measurementWriter = mapper.writerFor(classOf[Main.Measurement]) private def asJsonArray(fieldName: String, list: Seq[AnyRef]) = { val sw = new StringWriter val generator = jsonFactory.createGenerator(sw) generator.writeStartObject() generator.writeFieldName(fieldName) measurementWriter.writeValues(generator).init(true).writeAll(list.asJava) generator.close() sw.toString }
Flow
- (1) connection details to MQTT broker
- (2) settings for MQTT source specifying the topic to listen to
- (3) use helper method to cater for Paho failures on initial connect
- (4) add a kill switch to allow for stopping the subscription
- (5) convert incoming ByteString to String
- (6) parse JSON
- (7) group up to 50 messages into one, as long as they appear with 5 seconds
- (8) convert the list of measurements to a JSON array structure
- (9) store the JSON in a Kafka producer record
- (10) producer to Kafka
- Java
-
source
final MqttConnectionSettings connectionSettings = MqttConnectionSettings.create( "tcp://localhost:1883", // (1) "coffee-client", new MemoryPersistence()); final String topic = "coffee/level"; MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()); // (2) Source<MqttMessage, CompletionStage<Done>> restartingMqttSource = wrapWithAsRestartSource( // (3) () -> MqttSource.atMostOnce( connectionSettings.withClientId("coffee-control"), subscriptions, 8)); // Set up Kafka producer sink ProducerSettings<String, String> producerSettings = ProducerSettings .create(toClassic(system), new StringSerializer(), new StringSerializer()) .withBootstrapServers(kafkaServer); Sink<ProducerRecord<String, String>, CompletionStage<Done>> kafkaProducer = Producer.plainSink(producerSettings); String kafkaTopic = "measurements"; Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> completions = restartingMqttSource .viaMat(KillSwitches.single(), Keep.both()) // (4) .map(m -> m.payload().utf8String()) // (5) .map(measurementReader::readValue) // (6) .groupedWithin(50, Duration.ofSeconds(5)) // (7) .map(list -> asJsonArray("measurements", list)) // (8) .log("producing to Kafka", logAdapter) .map(json -> new ProducerRecord<>(kafkaTopic, "", json)) // (9) .toMat(kafkaProducer, Keep.both()) // (10) .run(system);
- Scala
-
source
val connectionSettings = MqttConnectionSettings("tcp://localhost:1883", "coffee-client", new MemoryPersistence) // (1) val topic = "coffee/level" val subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce) // (2) val restartingMqttSource = wrapWithAsRestartSource( // (3) MqttSource.atMostOnce(connectionSettings.withClientId("coffee-control"), subscriptions, 8)) // Set up Kafka producer sink val producerSettings = ProducerSettings(system.toClassic, new StringSerializer, new StringSerializer).withBootstrapServers(kafkaServer) val kafkaProducer = Producer.plainSink(producerSettings) val kafkaTopic = "measurements" val ((subscriptionInitialized, listener), streamCompletion) = restartingMqttSource .viaMat(KillSwitches.single)(Keep.both) // (4) .map(_.payload.utf8String) // (5) .map(measurementReader.readValue) // (6) .groupedWithin(50, 5.seconds) // (7) .map(list => asJsonArray("measurements", list)) // (8) .log("producing to Kafka") .map(json => new ProducerRecord[String, String](kafkaTopic, "", json)) // (9) .toMat(kafkaProducer)(Keep.both) // (10) .run