Spray JSONJackson conversion for the data class (3)
Elasticsearch setup (4)
Kafka consumer with committing support (5)
Parse message from Kafka to Movie and create Elasticsearch write message (6)
Use createWithContext to use an Elasticsearch flow with context-support (so it passes through the Kafka commit offset) (7)
React on write errors (8)
Let the Committer.flow aggregate commits to batches and commit to Kafka (9)
Combine consumer control and stream completion into DrainingControl (10)
Data class and JSON mapping
Java
sourcepackage samples.javadsl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
// Type in Elasticsearch (2)
public class Movie {
public final int id;
public final String title;
@JsonCreator
public Movie(@JsonProperty("id") int id, @JsonProperty("title") String title) {
this.id = id;
this.title = title;
}
@Override
public String toString() {
return "Movie(" + id + ", title=" + title + ")";
}
}
Scala
sourcepackage samples.scaladsl
import spray.json.DefaultJsonProtocol._
import spray.json._
// Type in Elasticsearch (2)
case class Movie(id: Int, title: String)
object JsonFormats {
// Spray JSON conversion setup (3)
implicit val movieFormat: JsonFormat[Movie] = jsonFormat2(Movie)
}