Subscribe to the Kafka topic

Use an Alpakka Kafka consumer to subscribe to a topic in Kafka. The received String values are sent to a BroadcastHub which creates a Source for the clients to connect to.

sourceprivate Source<String, ?> topicSource() {
    ConsumerSettings<Integer, String> kafkaConsumerSettings =
    ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new StringDeserializer())
            .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

        Consumer.plainSource(kafkaConsumerSettings, Subscriptions.topics(topic))
                .map(consumerRecord -> consumerRecord.value())
                // using a broadcast hub here, ensures that all websocket clients will use the same
                // consumer
                .runWith(BroadcastHub.of(String.class), materializer);

Handler for websocket requests

This websocketHandler is a Flow which will be used when a websocket client connects. It ignores data sent to it and publishes all data received from the topicSource() which is backed by the BroadcastHub.

sourceFlow<Message, Message, ?> webSocketHandler =
            // decouple clients from each other: if a client is too slow and more than 1000 elements to be sent to
            // to the client queue up here, we fail this client
            .buffer(1000, OverflowStrategy.fail())

Akka HTTP routes

This example code uses two routes * /events which opens a websocket to subscribe to the messages from the Kafka topic * /push which writes the text from the parameter value to the Kafka topic

sourceprivate Route createRoute(Flow<Message, Message, ?> webSocketHandler) {
    return concat(
            path("events", () -> handleWebSocketMessages(webSocketHandler)),
            path("push", () -> parameter("value", v -> {
                CompletionStage<Done> written = helper.writeToKafka(topic, v, actorSystem);
                return onSuccess(written, done -> complete("Ok"));

