Example code

This page highlights the most important sections of the example.

Subscribe to the Kafka topic

Use an Alpakka Kafka consumer to subscribe to a topic in Kafka. The received String values are sent to a BroadcastHub which creates a Source for the clients to connect to.

Java
sourceprivate Source<String, ?> topicSource() {
    ConsumerSettings<Integer, String> kafkaConsumerSettings =
    ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new StringDeserializer())
            .withBootstrapServers(kafkaBootstrapServers)
            .withGroupId(groupId)
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
            .withStopTimeout(Duration.ofSeconds(5));

    return
        Consumer.plainSource(kafkaConsumerSettings, Subscriptions.topics(topic))
                .map(consumerRecord -> consumerRecord.value())
                // using a broadcast hub here, ensures that all websocket clients will use the same
                // consumer
                .runWith(BroadcastHub.of(String.class), materializer);
}

Handler for websocket requests

This websocketHandler is a Flow which will be used when a websocket client connects. It ignores data sent to it and publishes all data received from the topicSource() which is backed by the BroadcastHub.

Java
sourceFlow<Message, Message, ?> webSocketHandler =
    Flow.fromSinkAndSource(
        Sink.ignore(),
        topicSource()
            // decouple clients from each other: if a client is too slow and more than 1000 elements to be sent to
            // to the client queue up here, we fail this client
            .buffer(1000, OverflowStrategy.fail())
            .via(addIndexFlow())
            .map(TextMessage::create));

Akka HTTP routes

This example code uses two routes * /events which opens a websocket to subscribe to the messages from the Kafka topic * /push which writes the text from the parameter value to the Kafka topic

Java
sourceprivate Route createRoute(Flow<Message, Message, ?> webSocketHandler) {
    return concat(
            path("events", () -> handleWebSocketMessages(webSocketHandler)),
            path("push", () -> parameter("value", v -> {
                CompletionStage<Done> written = helper.writeToKafka(topic, v, actorSystem);
                return onSuccess(written, done -> complete("Ok"));
            }))
    );
}

All imports

Java
sourceimport akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.http.javadsl.ConnectHttp;
import akka.http.javadsl.Http;
import akka.http.javadsl.ServerBinding;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.japi.Pair;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy;
import akka.stream.SystemMaterializer;
import akka.stream.javadsl.BroadcastHub;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.