Full source
The full example contains code to run an embedded Apache FTP server with a virtual file system.
- Java
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples.javadsl; import akka.Done; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.japi.Creator; import akka.japi.Pair; import akka.kafka.ConsumerSettings; import akka.kafka.ProducerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Consumer; import akka.kafka.javadsl.Producer; import akka.stream.KillSwitch; import akka.stream.KillSwitches; import akka.stream.UniqueKillSwitch; import akka.stream.alpakka.mqtt.MqttConnectionSettings; import akka.stream.alpakka.mqtt.MqttMessage; import akka.stream.alpakka.mqtt.MqttQoS; import akka.stream.alpakka.mqtt.MqttSubscriptions; import akka.stream.alpakka.mqtt.javadsl.MqttSink; import akka.stream.alpakka.mqtt.javadsl.MqttSource; import akka.stream.javadsl.Keep; import akka.stream.javadsl.RestartSource; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import akka.util.ByteString; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.KafkaContainer; import java.io.IOException; import java.io.StringWriter; import java.time.Duration; import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import static akka.actor.typed.javadsl.Adapter.toClassic; public class Main { public static void main(String[] args) throws Exception { KafkaContainer kafkaContainer = new KafkaContainer("5.4.1"); kafkaContainer.start(); try { Main me = new Main(); me.run(kafkaContainer.getBootstrapServers()); } finally { kafkaContainer.stop(); } } final ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "MqttToKafka"); final Logger log = LoggerFactory.getLogger(Main.class); // #json-mechanics /** * Data elements sent via MQTT broker. */ public static final class Measurement { public final Instant timestamp; public final long level; @JsonCreator public Measurement( @JsonProperty("timestamp") Instant timestamp, @JsonProperty("level") long level) { this.timestamp = timestamp; this.level = level; } } private final JsonFactory jsonFactory = new JsonFactory(); final ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); final ObjectReader measurementReader = mapper.readerFor(Measurement.class); final ObjectWriter measurementWriter = mapper.writerFor(Measurement.class); private String asJsonArray(String fieldName, List<Object> list) throws IOException { StringWriter sw = new StringWriter(); JsonGenerator generator = jsonFactory.createGenerator(sw); generator.writeStartObject(); generator.writeFieldName(fieldName); measurementWriter.writeValues(generator).init(true).writeAll(list); generator.close(); return sw.toString(); } // #json-mechanics // #restarting /** * Wrap a source with restart logic and exposes an equivalent materialized value. */ <M> Source<M, CompletionStage<Done>> wrapWithAsRestartSource( Creator<Source<M, CompletionStage<Done>>> source) { // makes use of the fact that these sources materialize a CompletionStage<Done> CompletableFuture<Done> fut = new CompletableFuture<>(); return RestartSource.withBackoff( Duration.ofMillis(100), Duration.ofSeconds(3), 0.2d, // randomFactor 5, // maxRestarts, () -> source .create() .mapMaterializedValue( mat -> mat.handle( (done, exception) -> { if (done != null) { fut.complete(done); } else { fut.completeExceptionally(exception); } return fut.toCompletableFuture(); }))) .mapMaterializedValue(ignore -> fut.toCompletableFuture()); } // #restarting void run(String kafkaServer) throws Exception { final LoggingAdapter logAdapter = Logging.getLogger(system.classicSystem(), getClass().getName()); // #flow final MqttConnectionSettings connectionSettings = MqttConnectionSettings.create( "tcp://localhost:1883", // (1) "coffee-client", new MemoryPersistence()); final String topic = "coffee/level"; MqttSubscriptions subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce()); // (2) Source<MqttMessage, CompletionStage<Done>> restartingMqttSource = wrapWithAsRestartSource( // (3) () -> MqttSource.atMostOnce( connectionSettings.withClientId("coffee-control"), subscriptions, 8)); // Set up Kafka producer sink ProducerSettings<String, String> producerSettings = ProducerSettings .create(toClassic(system), new StringSerializer(), new StringSerializer()) .withBootstrapServers(kafkaServer); Sink<ProducerRecord<String, String>, CompletionStage<Done>> kafkaProducer = Producer.plainSink(producerSettings); String kafkaTopic = "measurements"; Pair<Pair<CompletionStage<Done>, UniqueKillSwitch>, CompletionStage<Done>> completions = restartingMqttSource .viaMat(KillSwitches.single(), Keep.both()) // (4) .map(m -> m.payload().utf8String()) // (5) .map(measurementReader::readValue) // (6) .groupedWithin(50, Duration.ofSeconds(5)) // (7) .map(list -> asJsonArray("measurements", list)) // (8) .log("producing to Kafka", logAdapter) .map(json -> new ProducerRecord<>(kafkaTopic, "", json)) // (9) .toMat(kafkaProducer, Keep.both()) // (10) .run(system); // #flow // start producing messages to MQTT CompletionStage<Done> subscriptionInitialized = completions.first().first(); CompletionStage<UniqueKillSwitch> producer = subscriptionInitialized.thenApply( d -> produceMessages(measurementWriter, connectionSettings, topic)); KillSwitch listener = completions.first().second(); CompletionStage<Done> streamCompletion = completions.second(); streamCompletion .handle( (done, exception) -> { if (exception != null) { exception.printStackTrace(); return null; } else { return done; } }) .thenRun(system::terminate); // read the messages from the Kafka topic Consumer.Control consumerControl = Consumer .plainSource( ConsumerSettings.create(toClassic(system), new StringDeserializer(), new StringDeserializer()) .withBootstrapServers(kafkaServer).withGroupId("sample"), Subscriptions.topics(kafkaTopic) ) .map(ConsumerRecord::value) .log("read from Kafka", logAdapter) .toMat(Sink.ignore(), Keep.left()) .run(system); log.info("Letting things run for a while"); Thread.sleep(20 * 1000); producer.thenAccept(UniqueKillSwitch::shutdown); consumerControl.shutdown(); listener.shutdown(); } /** * Simulate messages from MQTT by writing to topic registered in MQTT broker. */ private UniqueKillSwitch produceMessages( ObjectWriter measurementWriter, MqttConnectionSettings connectionSettings, String topic) { List<Measurement> input = Arrays.asList( new Measurement(Instant.now(), 40), new Measurement(Instant.now(), 60), new Measurement(Instant.now(), 80), new Measurement(Instant.now(), 100), new Measurement(Instant.now(), 120)); MqttConnectionSettings sinkSettings = connectionSettings.withClientId("coffee-supervisor"); final Sink<MqttMessage, CompletionStage<Done>> mqttSink = MqttSink.create(sinkSettings, MqttQoS.atLeastOnce()); UniqueKillSwitch killSwitch = Source.cycle(() -> input.iterator()) .throttle(4, Duration.ofSeconds(1)) .map(measurementWriter::writeValueAsString) .map(s -> MqttMessage.create(topic, ByteString.fromString(s))) .viaMat(KillSwitches.single(), Keep.right()) .toMat(mqttSink, Keep.left()) .run(system); return killSwitch; } }
- Scala
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples.scaladsl import java.io.StringWriter import java.time.Instant import akka.Done import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.event.{Logging, LoggingAdapter} import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions} import akka.kafka.scaladsl.{Consumer, Producer} import akka.stream.alpakka.mqtt.{MqttConnectionSettings, MqttMessage, MqttQoS, MqttSubscriptions} import akka.stream.alpakka.mqtt.scaladsl.{MqttSink, MqttSource} import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} import akka.stream.{KillSwitches, UniqueKillSwitch} import akka.util.ByteString import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} import com.fasterxml.jackson.core.JsonFactory import com.fasterxml.jackson.databind.{ObjectMapper, ObjectWriter} import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence import org.slf4j.LoggerFactory import org.testcontainers.containers.KafkaContainer import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} import scala.jdk.CollectionConverters._ object Main extends App { val kafkaContainer = new KafkaContainer("5.4.1") kafkaContainer.start() try { val me = new Main me.run(kafkaContainer.getBootstrapServers) } finally kafkaContainer.stop() // #json-mechanics /** * Data elements sent via MQTT broker. */ final case class Measurement(timestamp: Instant, level: Long) // #json-mechanics } class Main { implicit val system = ActorSystem(Behaviors.empty, "MqttToKafka") implicit val ec: ExecutionContext = system.executionContext val log = LoggerFactory.getLogger(classOf[Main]) // #json-mechanics val jsonFactory = new JsonFactory val mapper = new ObjectMapper() .registerModule(new JavaTimeModule) .registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule) val measurementReader = mapper.readerFor(classOf[Main.Measurement]) val measurementWriter = mapper.writerFor(classOf[Main.Measurement]) private def asJsonArray(fieldName: String, list: Seq[AnyRef]) = { val sw = new StringWriter val generator = jsonFactory.createGenerator(sw) generator.writeStartObject() generator.writeFieldName(fieldName) measurementWriter.writeValues(generator).init(true).writeAll(list.asJava) generator.close() sw.toString } // #json-mechanics // #restarting /** * Wrap a source with restart logic and expose an equivalent materialized value. */ private def wrapWithAsRestartSource[M](source: => Source[M, Future[Done]]): Source[M, Future[Done]] = { val fut = Promise[Done] RestartSource.withBackoff(100.millis, 3.seconds, randomFactor = 0.2d, maxRestarts = 5) { () => source.mapMaterializedValue(mat => fut.completeWith(mat)) }.mapMaterializedValue(_ => fut.future) } // #restarting private def run(kafkaServer: String): Unit = { implicit val logAdapter: LoggingAdapter = Logging.getLogger(system.toClassic, getClass.getName) // #flow val connectionSettings = MqttConnectionSettings("tcp://localhost:1883", "coffee-client", new MemoryPersistence) // (1) val topic = "coffee/level" val subscriptions = MqttSubscriptions.create(topic, MqttQoS.atLeastOnce) // (2) val restartingMqttSource = wrapWithAsRestartSource( // (3) MqttSource.atMostOnce(connectionSettings.withClientId("coffee-control"), subscriptions, 8)) // Set up Kafka producer sink val producerSettings = ProducerSettings(system.toClassic, new StringSerializer, new StringSerializer).withBootstrapServers(kafkaServer) val kafkaProducer = Producer.plainSink(producerSettings) val kafkaTopic = "measurements" val ((subscriptionInitialized, listener), streamCompletion) = restartingMqttSource .viaMat(KillSwitches.single)(Keep.both) // (4) .map(_.payload.utf8String) // (5) .map(measurementReader.readValue) // (6) .groupedWithin(50, 5.seconds) // (7) .map(list => asJsonArray("measurements", list)) // (8) .log("producing to Kafka") .map(json => new ProducerRecord[String, String](kafkaTopic, "", json)) // (9) .toMat(kafkaProducer)(Keep.both) // (10) .run // #flow // start producing messages to MQTT val producer = subscriptionInitialized.map(_ => produceMessages(connectionSettings, topic)) streamCompletion .recover { case exception => exception.printStackTrace() null } .foreach(_ => system.terminate) // read the messages from the Kafka topic val consumerControl = Consumer .plainSource( ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer).withBootstrapServers(kafkaServer).withGroupId("sample"), Subscriptions.topics(kafkaTopic) ) .map(_.value) .log("read from Kafka") .toMat(Sink.ignore)(Keep.left) .run log.info("Letting things run for a while") Thread.sleep(20 * 1000) producer.foreach(_.shutdown) consumerControl.shutdown listener.shutdown() } /** * Simulate messages from MQTT by writing to topic registered in MQTT broker. */ private def produceMessages(connectionSettings: MqttConnectionSettings, topic: String): UniqueKillSwitch = { import Main.Measurement val input = Seq( Measurement(Instant.now, 40), Measurement(Instant.now, 60), Measurement(Instant.now, 80), Measurement(Instant.now, 100), Measurement(Instant.now, 120) ) val sinkSettings = connectionSettings.withClientId("coffee-supervisor") val killSwitch = Source .cycle(() => input.iterator) .throttle(4, 1.second) .map(measurementWriter.writeValueAsString) .map((s: String) => MqttMessage.create(topic, ByteString.fromString(s))) .viaMat(KillSwitches.single)(Keep.right) .toMat(MqttSink(sinkSettings, MqttQoS.atLeastOnce))(Keep.left) .run killSwitch } }