Example code
This page highlights the most important sections of the example.
Subscribe to the Kafka topic
Use an Alpakka Kafka consumer to subscribe to a topic in Kafka. The received String
values are sent to a BroadcastHub which creates a Source
for the clients to connect to.
- Java
-
source
private Source<String, ?> topicSource() { ConsumerSettings<Integer, String> kafkaConsumerSettings = ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new StringDeserializer()) .withBootstrapServers(kafkaBootstrapServers) .withGroupId(groupId) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") .withStopTimeout(Duration.ofSeconds(5)); return Consumer.plainSource(kafkaConsumerSettings, Subscriptions.topics(topic)) .map(consumerRecord -> consumerRecord.value()) // using a broadcast hub here, ensures that all websocket clients will use the same // consumer .runWith(BroadcastHub.of(String.class), materializer); }
Handler for websocket requests
This websocketHandler
is a Flow
which will be used when a websocket client connects. It ignores data sent to it and publishes all data received from the topicSource()
which is backed by the BroadcastHub.
- Java
-
source
Flow<Message, Message, ?> webSocketHandler = Flow.fromSinkAndSource( Sink.ignore(), topicSource() // decouple clients from each other: if a client is too slow and more than 1000 elements to be sent to // to the client queue up here, we fail this client .buffer(1000, OverflowStrategy.fail()) .via(addIndexFlow()) .map(TextMessage::create));
Akka HTTP routes
This example code uses two routes * /events
which opens a websocket to subscribe to the messages from the Kafka topic * /push
which writes the text from the parameter value
to the Kafka topic
- Java
-
source
private Route createRoute(Flow<Message, Message, ?> webSocketHandler) { return concat( path("events", () -> handleWebSocketMessages(webSocketHandler)), path("push", () -> parameter("value", v -> { CompletionStage<Done> written = helper.writeToKafka(topic, v, actorSystem); return onSuccess(written, done -> complete("Ok")); })) ); }
All imports
- Java
-
source
import akka.Done; import akka.NotUsed; import akka.actor.ActorSystem; import akka.http.javadsl.ConnectHttp; import akka.http.javadsl.Http; import akka.http.javadsl.ServerBinding; import akka.http.javadsl.model.HttpRequest; import akka.http.javadsl.model.HttpResponse; import akka.http.javadsl.model.ws.Message; import akka.http.javadsl.model.ws.TextMessage; import akka.http.javadsl.server.AllDirectives; import akka.http.javadsl.server.Route; import akka.japi.Pair; import akka.kafka.ConsumerSettings; import akka.kafka.Subscriptions; import akka.kafka.javadsl.Consumer; import akka.stream.ActorMaterializer; import akka.stream.Materializer; import akka.stream.OverflowStrategy; import akka.stream.SystemMaterializer; import akka.stream.javadsl.BroadcastHub; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit;