Example code

Description

  • Configure Kafka consumer (1)
  • Data class mapped to Elasticsearch (2)
  • Spray JSONJackson conversion for the data class (3)
  • Elasticsearch setup (4)
  • Kafka consumer with committing support (5)
  • Parse message from Kafka to Movie and create Elasticsearch write message (6)
  • Use createWithContext to use an Elasticsearch flow with context-support (so it passes through the Kafka commit offset) (7)
  • React on write errors (8)
  • Let the Committer.flow aggregate commits to batches and commit to Kafka (9)
  • Combine consumer control and stream completion into DrainingControl (10)

Data class and JSON mapping

Java
sourcepackage samples.javadsl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

// Type in Elasticsearch (2)
public class Movie {
    public final int id;
    public final String title;

    @JsonCreator
    public Movie(@JsonProperty("id") int id, @JsonProperty("title") String title) {
        this.id = id;
        this.title = title;
    }

    @Override
    public String toString() {
        return "Movie(" + id + ", title=" + title + ")";
    }
}
Scala
sourcepackage samples.scaladsl

import spray.json.DefaultJsonProtocol._
import spray.json._

// Type in Elasticsearch (2)
case class Movie(id: Int, title: String)

object JsonFormats {
  // Spray JSON conversion setup (3)
  implicit val movieFormat: JsonFormat[Movie] = jsonFormat2(Movie)
}

Flow

Java
sourceConsumer.DrainingControl<Done> control =
        Consumer.sourceWithOffsetContext(kafkaConsumerSettings, Subscriptions.topics(topic)) // (5)
                .map(
                        consumerRecord -> { // (6)
                            Movie movie = JsonMappers.movieReader.readValue(consumerRecord.value());
                            return WriteMessage.createUpsertMessage(String.valueOf(movie.id), movie);
                        })
                .via(
                        ElasticsearchFlow.createWithContext(
                                ElasticsearchParams.V7(indexName),
                                ElasticsearchWriteSettings.create(elasticsearchAddress),
                                JsonMappers.mapper)) // (7)
                .map(
                        writeResult -> { // (8)
                            writeResult
                                    .getError()
                                    .ifPresent(
                                            errorJson -> {
                                                throw new RuntimeException(
                                                        "Elasticsearch update failed "
                                                                + writeResult.getErrorReason().orElse(errorJson));
                                            });
                            return NotUsed.notUsed();
                        })
                .toMat(Committer.sinkWithOffsetContext(CommitterSettings.create(toClassic(actorSystem))), Keep.both()) // (9)
                .mapMaterializedValue(Consumer::createDrainingControl) // (10)
                .run(actorSystem);
Scala
sourceval control: Consumer.DrainingControl[Done] = Consumer
  .sourceWithOffsetContext(kafkaConsumerSettings, Subscriptions.topics(topic)) // (5)
  .map { consumerRecord => // (6)
    val movie = consumerRecord.value().parseJson.convertTo[Movie]
    WriteMessage.createUpsertMessage(movie.id.toString, movie)
  }
  .via(ElasticsearchFlow.createWithContext(
    ElasticsearchParams.V7(indexName), ElasticsearchWriteSettings(connectionSettings))) // (7)
  .map { writeResult => // (8)
    writeResult.error.foreach { errorJson =>
      throw new RuntimeException(s"Elasticsearch update failed ${writeResult.errorReason.getOrElse(errorJson)}")
    }
    NotUsed
  }
  .via(Committer.flowWithOffsetContext(CommitterSettings(actorSystem.toClassic))) // (9)
  .toMat(Sink.ignore)(Consumer.DrainingControl.apply) // (10)
  .run()

Kafka setup

Java
source// configure Kafka consumer (1)
ConsumerSettings<Integer, String> kafkaConsumerSettings =
        ConsumerSettings.create(actorSystem, new IntegerDeserializer(), new StringDeserializer())
                .withBootstrapServers(kafkaBootstrapServers)
                .withGroupId(groupId)
                .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                .withStopTimeout(Duration.ofSeconds(5));
Scala
source// configure Kafka consumer (1)
val kafkaConsumerSettings = ConsumerSettings(actorSystem.toClassic, new IntegerDeserializer, new StringDeserializer)
  .withBootstrapServers(kafkaBootstrapServers)
  .withGroupId(groupId)
  .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  .withStopTimeout(0.seconds)

Elasticsearch setup

Java
sourceprivate final String indexName = "movies";
Scala
sourceval indexName = "movies"
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.