Tail files added to a directory and publish to Elasticsearch

This sample is a simplified implementation of a log file to Elasticsearch utility logstash. The application will use Alpakka File to watch a directory for new file creation events and then tail the files for their contents. The log file is tailed until it is deleted or the stream is complete. Lines from the log file have their date parsed based on several compatible date time formats (ISO 8601 Zoned and Local datetime timestamps). A record is created that is comprised of the following fields.

  1. The full log line
  2. An extracted timestamp
  3. Directory
  4. Filename
  5. Line number

Each log line record is indexed in an Elasticsearch index called logs.
The application will use Alpakka Elasticsearch to index log lines into Elasticsearch and query them from the index once the stream is complete.

When the stream completes, Alpakka Elasticsearch is used to query all log lines that were indexed and log them. A summary of tailed log files is also logged, which includes the following fields.

  1. Directory
  2. Filename
  3. First seen timestamp
  4. Last updated timestamp
  5. Total log lines parsed

Browse the sources at Github.

To try out this project clone the Alpakka Samples repository and find it in the alpakka-sample-file-to-elasticsearch directory.

Dependencies

Dependencies (sbt notation)
sourceval AkkaVersion = "2.9.0"
val AlpakkaVersion = "7.0.2"
val AkkaDiagnosticsVersion = "2.1.0"
  "com.lightbend.akka" %% "akka-stream-alpakka-file" % AlpakkaVersion,
  "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % AlpakkaVersion,
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor" % AkkaVersion,
  "com.lightbend.akka" %% "akka-diagnostics" % AkkaDiagnosticsVersion,
  // for JSON in Scala
  "io.spray" %% "spray-json" % "1.3.6",
  // for JSON in Java
  "com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % "2.15.2",
  "com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % "2.15.2",
  // Logging
  "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  "ch.qos.logback" % "logback-classic" % "1.2.11",

Example code walkthrough

Description

  • Data class mapped to Elasticsearch (1)
  • Spray JSONJackson conversion for the data class (2)
  • Create DirectoryChangeSource (3) to watch for file events in a given directory (4)
  • Only watch for file creation events (5)
  • Create a new FileTailSource and return it as a sub-stream (6)
  • Create FileTailSource for a given path (7)
  • Count each line from the log file (8)
  • Extract the date timestamp from the log line (9)
  • Create a LogLine record (10)
  • Create an ES index wrapper message for LogLine (11)
  • Use Alpakka Elasticsearch to create a new LogLine record. (12)
  • Track statistics per log file (13)
  • Use Alpakka Elasticsearch to return all entries from the provided index (14)
  • Compose stream together starting with the DirectoryChangesSource (15)
  • Run the graph and capture the materialized values (16)
  • Run a new graph to query all records from Elasticsearch and get the results (17)

Data class and JSON mapping

Java
source
// Type in Elasticsearch (2) public class LogLine { public final String line; public final Long lineNo; public final Long date; public final String filename; public final String directory; @JsonCreator public LogLine( @JsonProperty("line") String line, @JsonProperty("lineNo") Long lineNo, @JsonProperty("date") Long date, @JsonProperty("filename") String filename, @JsonProperty("directory") String directory) { this.line = line; this.lineNo = lineNo; this.date = date; this.filename = filename; this.directory = directory; } @Override public java.lang.String toString() { return "LogLine(line=\"" + line + "\", lineNo=" + lineNo.toString() + ", date=" + date.toString() + ", filename=\"" + filename + "\", directory=\"" + directory + "\")"; } } class JsonMappers { // Jackson conversion setup (3) public final static ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); public final static ObjectWriter logLineWriter = mapper.writerFor(LogLine.class); public final static ObjectReader logLineReader = mapper.readerFor(LogLine.class); }
Scala
source
// Type in Elasticsearch (2) final case class LogLine(line: String, lineNo: Long, date: Long, filename: String, directory: String) object JsonFormats { // Spray JSON conversion setup (3) implicit val logLineFormat: JsonFormat[LogLine] = jsonFormat5(LogLine) }

Setup DirectoryChangeSource

Java
source
private final static String testDataPath = "./test-data"; private final static String inputLogsPath = testDataPath + "/input"; // watch directory (4) private Source<Pair<Path, DirectoryChange>, NotUsed> directoryChangesSource() { final FileSystem fs = FileSystems.getDefault(); final Duration pollingInterval = Duration.ofSeconds(1); final int maxBufferSize = 1000; return DirectoryChangesSource.create(fs.getPath(inputLogsPath), pollingInterval, maxBufferSize); // source (3) }
Scala
source
val testDataPath = "./test-data" val inputLogsPath = s"$testDataPath/input" // watch directory (4) val fs = FileSystems.getDefault val directoryChangesSource: Source[(Path, DirectoryChange), NotUsed] = DirectoryChangesSource(fs.getPath(inputLogsPath), pollInterval = 1.second, maxBufferSize = 1000) // source (3)

Tail log files and create a LogLine per line

Java
source
private Flow<Pair<Path, DirectoryChange>, Source<LogLine, NotUsed>, NotUsed> tailNewLogs() { return Flow.<Pair<Path, DirectoryChange>>create() // only watch for file creation events (5) .filter(pair -> pair.second() == DirectoryChange.Creation) .map(pair -> { Path path = pair.first(); log.info("File create detected: {}", path.toString()); // create a new `FileTailSource` and return it as a sub-stream (6) return fileTailSource(path); }); } private Source<LogLine, NotUsed> fileTailSource(Path path) { String filename = path.getFileName().toString(); String directory = path.getParent().toString(); // create `FileTailSource` for a given `path` (7) return FileTailSource .createLines(path, 8192, Duration.ofMillis(250)) .map(line -> { log.debug("Parsed > {}", line); return line; }) .scan(new LogAcc(), (acc, line) -> { // count each line from the log file (8) Long lineNo = acc.lineNo + 1; // extract the date timestamp from the log line (9) Long date = DateTimeExtractor.extractDate(line); // create a `LogLine` record (10) LogLine logLine = new LogLine(line, lineNo, date, filename, directory); return new LogAcc(lineNo, Optional.of(logLine)); }) .mapConcat(logAcc -> { if (logAcc.logLine.isPresent()) { return Collections.singletonList(logAcc.logLine.get()); } else { return Collections.emptyList(); } }); }
Scala
source
val tailNewLogs: Flow[(Path, DirectoryChange), Source[LogLine, NotUsed], NotUsed] = Flow[(Path, DirectoryChange)] // only watch for file creation events (5) .collect { case (path, DirectoryChange.Creation) => path } .map { path => log.info("File create detected: {}", path) // create a new `FileTailSource` and return it as a sub-stream (6) fileTailSource(path) } def fileTailSource(path: Path): Source[LogLine, NotUsed] = { val filename = path.getFileName.toString val directory = path.getParent.toString // create `FileTailSource` for a given `path` (7) FileTailSource .lines( path = path, maxLineSize = 8192, pollingInterval = 250.millis ) .map { line => log.debug("Parsed > {}", line) line } .scan(LogAcc()) { (acc, line) => // count each line from the log file (8) val lineNo = acc.lineNo + 1 // extract the date timestamp from the log line (9) val date = DateTimeExtractor.extractDate(line) LogAcc(lineNo, // create a `LogLine` record (10) Some(LogLine(line, lineNo, date, filename, directory)) ) } .mapConcat(_.logLine.toList) }

Index LogLines into Elasticsearch

Java
source
private Flow<LogLine, LogLine, NotUsed> elasticsearchIndexFlow() { return Flow.<LogLine>create() // create an ES index wrapper message for `LogLine` (11) .map(WriteMessage::createIndexMessage) // use Alpakka Elasticsearch to create a new `LogLine` record. (12) // takes `ObjectMapper` for `LogLine` for serialization .via(ElasticsearchFlow.create( ElasticsearchParams.V5(indexName, typeName), ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5), JsonMappers.mapper)) .map(writeResult -> { writeResult .getError() .ifPresent(errorJson -> { throw new RuntimeException("Elasticsearch update failed " + writeResult.getErrorReason().orElse(errorJson)); }); return writeResult.message().source().get(); }); }
Scala
source
val elasticsearchIndexFlow: Flow[LogLine, LogLine, NotUsed] = Flow[LogLine] // create an ES index wrapper message for `LogLine` (11) .map(WriteMessage.createIndexMessage[LogLine]) // use Alpakka Elasticsearch to create a new `LogLine` record. (12) // implicitly takes `JsonFormat` for `LogLine` for serialization .via(ElasticsearchFlow.create(ElasticsearchParams.V5(indexName, typeName), ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5))) .mapConcat { writeResult => writeResult.error.foreach { errorJson => throw new RuntimeException(s"Elasticsearch index failed ${writeResult.errorReason.getOrElse(errorJson)}") } writeResult.message.source.toList }

Log files statistics summary

Java
source
private Flow<LogLine, HashMap<Pair<String, String>, LogFileSummary>, NotUsed> summarizeLogStatsFlow() { return Flow.<LogLine>create() // track statistics per log file (13) .scan(new HashMap<>(), (summaries, logLine) -> { Pair<String, String> key = Pair.create(logLine.directory, logLine.filename); Long timestamp = RunOps.now(); if (summaries.containsKey(key)) { LogFileSummary summary = summaries.get(key); LogFileSummary newSummary = new LogFileSummary(summary.directory, summary.filename, summary.firstSeen, timestamp, logLine.lineNo); summaries.put(key, newSummary); } else { LogFileSummary newSummary = new LogFileSummary(logLine.directory, logLine.filename, timestamp, timestamp, logLine.lineNo); summaries.put(key, newSummary); } return summaries; }); }
Scala
source
val summarizeLogStatsFlow: Flow[LogLine, LogFileSummaries, NotUsed] = Flow[LogLine] // track statistics per log file (13) .scan(Map[(String, String), LogFileSummary]()) { (summaries, logLine) => val key = (logLine.directory, logLine.filename) val timestamp = now() val summary = summaries .get(key) .map(_.copy(lastUpdated = timestamp, numberOfLines = logLine.lineNo)) .getOrElse(LogFileSummary(logLine.directory, logLine.filename, timestamp, timestamp, logLine.lineNo)) summaries + (key -> summary) }

Query Elasticsearch

Java
source
private CompletionStage<List<LogLine>> queryAllRecordsFromElasticsearch(String indexName) { CompletionStage<List<LogLine>> reading = // use Alpakka Elasticsearch to return all entries from the provided index (14) ElasticsearchSource .typed( ElasticsearchParams.V5(indexName, typeName), "{\"match_all\": {}}", ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5), LogLine.class) .map(ReadResult::source) .runWith(Sink.seq(), system); reading.thenAccept(non -> log.info("Reading finished")); return reading; }
Scala
source
def queryAllRecordsFromElasticsearch(indexName: String): Future[immutable.Seq[LogLine]] = { val reading = ElasticsearchSource // use Alpakka Elasticsearch to return all entries from the provided index (14) .typed[LogLine](ElasticsearchParams.V5(indexName, typeName), """{"match_all": {}}""", ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5)) .map(_.source) .runWith(Sink.seq) reading.foreach(_ => log.info("Reading finished")) reading }

Composing everything together

Java
source
// compose stream together starting with the `DirectoryChangesSource` (15) RunnableGraph<Pair<UniqueKillSwitch, CompletionStage<HashMap<Pair<String, String>, LogFileSummary>>>> graph = directoryChangesSource() // create `FileTailSource` sub-streams .via(tailNewLogs()) // merge the sub-streams together and emit all file `LogLine` records downstream .flatMapMerge(streamParallelism, source -> source) // index into Elasticsearch .via(elasticsearchIndexFlow()) // summarize log statistics .via(summarizeLogStatsFlow()) // create a `KillSwitch` so we can shutdown the stream from the outside. use this as the materialized value. .viaMat(KillSwitches.single(), Keep.right()) // materialize the last recorded log stats summarization. // return both a `UniqueKillSwitch` `CompletionStage<Map<Pair<String, String>, LogFileSummary>>` .toMat(Sink.last(), Keep.both());
Scala
source
// compose stream together starting with the `DirectoryChangesSource` (15) val graph: RunnableGraph[(UniqueKillSwitch, Future[LogFileSummaries])] = directoryChangesSource // create `FileTailSource` sub-streams .via(tailNewLogs) // merge the sub-streams together and emit all file `LogLine` records downstream .flatMapMerge(streamParallelism, identity) // index into Elasticsearch .via(elasticsearchIndexFlow) // summarize log statistics .via(summarizeLogStatsFlow) // create a `KillSwitch` so we can shutdown the stream from the outside. use this as the materialized value. .viaMat(KillSwitches.single)(Keep.right) // materialize the last recorded log stats summarization. // return both a `UniqueKillSwitch` `Future[LogFileSummaries]` .toMat(Sink.last)(Keep.both)

Running the application

Java
source
RunOps.deleteAllFilesFrom(inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS); // run the graph and capture the materialized values (16) Pair<UniqueKillSwitch, CompletionStage<HashMap<Pair<String, String>, LogFileSummary>>> running = graph.run(system); UniqueKillSwitch control = running.first(); CompletionStage<HashMap<Pair<String, String>, LogFileSummary>> stream = running.second(); RunOps.copyTestDataTo(testDataPath, inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS); log.info("Running index stream for ", streamRuntime.toString()); Thread.sleep(streamRuntime.toMillis()); log.info("Shutting down index stream"); control.shutdown(); log.info("Wait for index stream to shutdown"); Map<Pair<String, String>, LogFileSummary> summaries = stream.toCompletableFuture().get(10, TimeUnit.SECONDS); // run a new graph to query all records from Elasticsearch and get the results (17) List<LogLine> results = queryAllRecordsFromElasticsearch(indexName).toCompletableFuture().get(10, TimeUnit.SECONDS); printResults(results, summaries); RunOps.deleteAllFilesFrom(inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS); return RunOps.shutdown(system);
Scala
source
val run = for { _ <- deleteAllFilesFrom(inputLogsPath) // run the graph and capture the materialized values (16) (control, stream) = graph.run() _ <- copyTestDataTo(testDataPath, inputLogsPath) summaries <- runStreamForAwhileAndShutdown(streamRuntime, control, stream) // run a new graph to query all records from Elasticsearch and get the results (17) results <- queryAllRecordsFromElasticsearch(indexName) _ = printResults(results, summaries) _ <- deleteAllFilesFrom(inputLogsPath) _ <- shutdown(actorSystem) } yield () Await.result(run, streamRuntime + 20.seconds)

All Alpakka samples

Show Alpakka samples listing.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.