Full source

The full example contains code to run an embedded Apache FTP server with a virtual file system.

Java
source/*
 * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com>
 */

package samples.javadsl;

import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Creator;
import akka.japi.Pair;
import akka.kafka.ConsumerSettings;
import akka.kafka.ProducerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.kafka.javadsl.Producer;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.UniqueKillSwitch;
import akka.stream.alpakka.mqtt.MqttConnectionSettings;
import akka.stream.alpakka.mqtt.MqttMessage;
import akka.stream.alpakka.mqtt.MqttQoS;
import akka.stream.alpakka.mqtt.MqttSubscriptions;
import akka.stream.alpakka.mqtt.javadsl.MqttSink;
import akka.stream.alpakka.mqtt.javadsl.MqttSource;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

import java.io.IOException;
import java.io.StringWriter;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import static akka.actor.typed.javadsl.Adapter.toClassic;

public class Main {

    public static void main(String[] args) throws Exception {
        KafkaContainer kafkaContainer = new KafkaContainer("5.4.1");
        kafkaContainer.start();
        try {
            Main me = new Main();
            me.run(kafkaContainer.getBootstrapServers());
        } finally {
            kafkaContainer.stop();
        }
    }

    final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "MqttToKafka");
    final Logger log = LoggerFactory.getLogger(Main.class);

    // #json-mechanics

    /**
     * Data elements sent via MQTT broker.
     */
    public static final class Measurement {
        public final Instant timestamp;
        public final long level;

        @JsonCreator
        public Measurement(
                @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") long level) {
            this.timestamp = timestamp;
            this.level = level;
        }
    }

    private final JsonFactory jsonFactory = new JsonFactory();

    final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
    final ObjectReader measurementReader = mapper.readerFor(Measurement.class);
    final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class);

    private String asJsonArray(String fieldName, List<Object> list) throws IOException {
        StringWriter sw = new StringWriter();
        JsonGenerator generator = jsonFactory.createGenerator(sw);
        generator.writeStartObject();
        generator.writeFieldName(fieldName);
        measurementWriter.writeValues(generator).init(true).writeAll(list);
        generator.close();
        return sw.toString();
    }
    // #json-mechanics

    // #restarting

    /**
     * Wrap a source with restart logic and exposes an equivalent materialized value.
     */
    <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource(
            Creator<Source<M, CompletionStage<Done>>> source) {
        // makes use of the fact that these sources materialize a CompletionStage<Done>
        CompletableFuture<Done> fut = new CompletableFuture<>();
        return RestartSource.withBackoff(
                Duration.ofMillis(100),
                Duration.ofSeconds(3),
                0.2d, // randomFactor
                5, // maxRestarts,
                () ->
                        source
                                .create()
                                .mapMaterializedValue(
                                        mat ->
                                                mat.handle(
                                                        (done, exception) -> {
                                                            if (done != null) {
                                                                fut.complete(done);
                                                            } else {
                                                                fut.completeExceptionally(exception);
                                                            }
                                                            return fut.toCompletableFuture();
                                                        })))
                .mapMaterializedValue(ignore -> fut.toCompletableFuture());
    }
    // #restarting

    void run(String kafkaServer) throws Exception {
        final LoggingAdapter logAdapter = Logging.getLogger(system.classicSystem(), getClass().getName());
        // #flow
        final MqttConnectionSettings connectionSettings =
                MqttConnectionSettings.create(
                        "tcp://localhost:1883", // (1)
                        "coffee-client",
                        new MemoryPersistence());

        final String topic = "coffee/level";

        MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()); // (2)

        Source<MqttMessage, CompletionStage<Done>> restartingMqttSource =
                wrapWithAsRestartSource( // (3)
                        () ->
                                MqttSource.atMostOnce(
                                        connectionSettings.withClientId("coffee-control"), subscriptions, 8));

        // Set up Kafka producer sink
        ProducerSettings<String, String> producerSettings =
                ProducerSettings
                        .create(toClassic(system), new StringSerializer(), new StringSerializer())
                        .withBootstrapServers(kafkaServer);
        Sink<ProducerRecord<String, String>, CompletionStage<Done>> kafkaProducer =
                Producer.plainSink(producerSettings);
        String kafkaTopic = "measurements";

        Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> completions =
                restartingMqttSource
                        .viaMat(KillSwitches.single(), Keep.both()) // (4)
                        .map(m -> m.payload().utf8String()) // (5)
                        .map(measurementReader::readValue) // (6)
                        .groupedWithin(50, Duration.ofSeconds(5)) // (7)
                        .map(list -> asJsonArray("measurements", list)) // (8)
                        .log("producing to Kafka", logAdapter)
                        .map(json -> new ProducerRecord<>(kafkaTopic, "", json)) // (9)
                        .toMat(kafkaProducer, Keep.both()) // (10)
                        .run(system);
        // #flow

        // start producing messages to MQTT
        CompletionStage<Done> subscriptionInitialized = completions.first().first();
        CompletionStage<UniqueKillSwitch> producer =
                subscriptionInitialized.thenApply(
                        d -> produceMessages(measurementWriter, connectionSettings, topic));

        KillSwitch listener = completions.first().second();

        CompletionStage<Done> streamCompletion = completions.second();
        streamCompletion
                .handle(
                        (done, exception) -> {
                            if (exception != null) {
                                exception.printStackTrace();
                                return null;
                            } else {
                                return done;
                            }
                        })
                .thenRun(system::terminate);

        // read the messages from the Kafka topic
        Consumer.Control consumerControl =
                Consumer
                        .plainSource(
                                ConsumerSettings.create(toClassic(system), new StringDeserializer(), new StringDeserializer())
                                        .withBootstrapServers(kafkaServer).withGroupId("sample"),
                                Subscriptions.topics(kafkaTopic)
                        )
                        .map(ConsumerRecord::value)
                        .log("read from Kafka", logAdapter)
                        .toMat(Sink.ignore(), Keep.left())
                        .run(system);

        log.info("Letting things run for a while");
        Thread.sleep(20 * 1000);

        producer.thenAccept(UniqueKillSwitch::shutdown);
        consumerControl.shutdown();
        listener.shutdown();
    }

    /**
     * Simulate messages from MQTT by writing to topic registered in MQTT broker.
     */
    private UniqueKillSwitch produceMessages(
            ObjectWriter measurementWriter, MqttConnectionSettings connectionSettings, String topic) {
        List<Measurement> input =
                Arrays.asList(
                        new Measurement(Instant.now(), 40),
                        new Measurement(Instant.now(), 60),
                        new Measurement(Instant.now(), 80),
                        new Measurement(Instant.now(), 100),
                        new Measurement(Instant.now(), 120));

        MqttConnectionSettings sinkSettings = connectionSettings.withClientId("coffee-supervisor");

        final Sink<MqttMessage, CompletionStage<Done>> mqttSink =
                MqttSink.create(sinkSettings, MqttQoS.atLeastOnce());
        UniqueKillSwitch killSwitch =
                Source.cycle(() -> input.iterator())
                        .throttle(4, Duration.ofSeconds(1))
                        .map(measurementWriter::writeValueAsString)
                        .map(s -> MqttMessage.create(topic, ByteString.fromString(s)))
                        .viaMat(KillSwitches.single(), Keep.right())
                        .toMat(mqttSink, Keep.left())
                        .run(system);
        return killSwitch;
    }
}
Scala
source/*
 * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com>
 */
package samples.scaladsl

import java.io.StringWriter
import java.time.Instant

import akka.Done
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.event.{Logging, LoggingAdapter}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.alpakka.mqtt.{MqttConnectionSettings, MqttMessage, MqttQoS, MqttSubscriptions}
import akka.stream.alpakka.mqtt.scaladsl.{MqttSink, MqttSource}
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}
import akka.stream.{KillSwitches, UniqueKillSwitch}
import akka.util.ByteString
import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty}
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.{ObjectMapper, ObjectWriter}
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import org.slf4j.LoggerFactory
import org.testcontainers.containers.KafkaContainer

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.jdk.CollectionConverters._

object Main extends App {
  val kafkaContainer = new KafkaContainer("5.4.1")
  kafkaContainer.start()
  try {
    val me = new Main
    me.run(kafkaContainer.getBootstrapServers)
  } finally kafkaContainer.stop()

  // #json-mechanics
  /**
   * Data elements sent via MQTT broker.
   */
  final case class Measurement(timestamp: Instant, level: Long)

  // #json-mechanics

}

class Main {
  implicit val system = ActorSystem(Behaviors.empty, "MqttToKafka")
  implicit val ec: ExecutionContext = system.executionContext
  val log = LoggerFactory.getLogger(classOf[Main])

  // #json-mechanics
  val jsonFactory = new JsonFactory
  val mapper = new ObjectMapper()
    .registerModule(new JavaTimeModule)
    .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
  val measurementReader = mapper.readerFor(classOf[Main.Measurement])
  val measurementWriter = mapper.writerFor(classOf[Main.Measurement])

  private def asJsonArray(fieldName: String, list: Seq[AnyRef]) = {
    val sw = new StringWriter
    val generator = jsonFactory.createGenerator(sw)
    generator.writeStartObject()
    generator.writeFieldName(fieldName)
    measurementWriter.writeValues(generator).init(true).writeAll(list.asJava)
    generator.close()
    sw.toString
  }
  // #json-mechanics

  // #restarting
  /**
   * Wrap a source with restart logic and expose an equivalent materialized value.
   */
  private def wrapWithAsRestartSource[M](source: => Source[M, Future[Done]]): Source[M, Future[Done]] = {
    val fut = Promise[Done]
    RestartSource.withBackoff(100.millis, 3.seconds, randomFactor = 0.2d, maxRestarts = 5) {
      () => source.mapMaterializedValue(mat => fut.completeWith(mat))
    }.mapMaterializedValue(_ => fut.future)
  }
  // #restarting

  private def run(kafkaServer: String): Unit = {
    implicit val logAdapter: LoggingAdapter = Logging.getLogger(system.toClassic, getClass.getName)
    // #flow
    val connectionSettings = MqttConnectionSettings("tcp://localhost:1883", "coffee-client", new MemoryPersistence) // (1)

    val topic = "coffee/level"

    val subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce) // (2)

    val restartingMqttSource = wrapWithAsRestartSource( // (3)
      MqttSource.atMostOnce(connectionSettings.withClientId("coffee-control"), subscriptions, 8))

    // Set up Kafka producer sink
    val producerSettings = ProducerSettings(system.toClassic, new StringSerializer, new StringSerializer).withBootstrapServers(kafkaServer)
    val kafkaProducer = Producer.plainSink(producerSettings)
    val kafkaTopic = "measurements"

    val ((subscriptionInitialized, listener), streamCompletion) = restartingMqttSource
      .viaMat(KillSwitches.single)(Keep.both) // (4)
      .map(_.payload.utf8String) // (5)
      .map(measurementReader.readValue) // (6)
      .groupedWithin(50, 5.seconds) // (7)
      .map(list => asJsonArray("measurements", list)) // (8)
      .log("producing to Kafka")
      .map(json => new ProducerRecord[String, String](kafkaTopic, "", json)) // (9)
      .toMat(kafkaProducer)(Keep.both) // (10)
      .run
    // #flow

    // start producing messages to MQTT
    val producer = subscriptionInitialized.map(_ => produceMessages(connectionSettings, topic))
    streamCompletion
      .recover {
        case exception =>
          exception.printStackTrace()
          null
      }
      .foreach(_ => system.terminate)

    // read the messages from the Kafka topic
    val consumerControl = Consumer
      .plainSource(
        ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer).withBootstrapServers(kafkaServer).withGroupId("sample"),
        Subscriptions.topics(kafkaTopic)
      )
      .map(_.value)
      .log("read from Kafka")
      .toMat(Sink.ignore)(Keep.left)
      .run

    log.info("Letting things run for a while")
    Thread.sleep(20 * 1000)
    producer.foreach(_.shutdown)
    consumerControl.shutdown
    listener.shutdown()
  }

  /**
   * Simulate messages from MQTT by writing to topic registered in MQTT broker.
   */
  private def produceMessages(connectionSettings: MqttConnectionSettings, topic: String): UniqueKillSwitch = {
    import Main.Measurement
    val input = Seq(
      Measurement(Instant.now, 40),
      Measurement(Instant.now, 60),
      Measurement(Instant.now, 80),
      Measurement(Instant.now, 100),
      Measurement(Instant.now, 120)
    )

    val sinkSettings = connectionSettings.withClientId("coffee-supervisor")
    val killSwitch = Source
      .cycle(() => input.iterator)
      .throttle(4, 1.second)
      .map(measurementWriter.writeValueAsString)
      .map((s: String) => MqttMessage.create(topic, ByteString.fromString(s)))
      .viaMat(KillSwitches.single)(Keep.right)
      .toMat(MqttSink(sinkSettings, MqttQoS.atLeastOnce))(Keep.left)
      .run
    killSwitch
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.