Full source
The full example.
Main
- Java
-
source
package samples.javadsl; import akka.Done; import akka.NotUsed; import akka.actor.typed.ActorSystem; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; import akka.stream.KillSwitches; import akka.stream.UniqueKillSwitch; import akka.stream.alpakka.elasticsearch.*; import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow; import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource; import akka.stream.alpakka.file.DirectoryChange; import akka.stream.alpakka.file.javadsl.DirectoryChangesSource; import akka.stream.alpakka.file.javadsl.FileTailSource; import akka.stream.javadsl.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import samples.common.DateTimeExtractor; import java.nio.file.FileSystem; import java.nio.file.FileSystems; import java.nio.file.Path; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class Main { private static final Logger log = LoggerFactory.getLogger(Main.class); private ActorSystem<?> system; private final ElasticsearchConnectionSettings connectionSettings; public Main() { connectionSettings = ElasticsearchConnectionSettings.create(RunOps.elasticsearchAddress()); } private final static Duration streamRuntime = Duration.ofSeconds(10L); private final static Integer streamParallelism = 47; private final static String indexName = "logs"; private final static String typeName = "_doc"; // #directory-change-source private final static String testDataPath = "./test-data"; private final static String inputLogsPath = testDataPath + "/input"; // watch directory (4) private Source<Pair<Path, DirectoryChange>, NotUsed> directoryChangesSource() { final FileSystem fs = FileSystems.getDefault(); final Duration pollingInterval = Duration.ofSeconds(1); final int maxBufferSize = 1000; return DirectoryChangesSource.create(fs.getPath(inputLogsPath), pollingInterval, maxBufferSize); // source (3) } // #directory-change-source // #tail-logs private Flow<Pair<Path, DirectoryChange>, Source<LogLine, NotUsed>, NotUsed> tailNewLogs() { return Flow.<Pair<Path, DirectoryChange>>create() // only watch for file creation events (5) .filter(pair -> pair.second() == DirectoryChange.Creation) .map(pair -> { Path path = pair.first(); log.info("File create detected: {}", path.toString()); // create a new `FileTailSource` and return it as a sub-stream (6) return fileTailSource(path); }); } private Source<LogLine, NotUsed> fileTailSource(Path path) { String filename = path.getFileName().toString(); String directory = path.getParent().toString(); // create `FileTailSource` for a given `path` (7) return FileTailSource .createLines(path, 8192, Duration.ofMillis(250)) .map(line -> { log.debug("Parsed > {}", line); return line; }) .scan(new LogAcc(), (acc, line) -> { // count each line from the log file (8) Long lineNo = acc.lineNo + 1; // extract the date timestamp from the log line (9) Long date = DateTimeExtractor.extractDate(line); // create a `LogLine` record (10) LogLine logLine = new LogLine(line, lineNo, date, filename, directory); return new LogAcc(lineNo, Optional.of(logLine)); }) .mapConcat(logAcc -> { if (logAcc.logLine.isPresent()) { return Collections.singletonList(logAcc.logLine.get()); } else { return Collections.emptyList(); } }); } // #tail-logs // #es-index-flow private Flow<LogLine, LogLine, NotUsed> elasticsearchIndexFlow() { return Flow.<LogLine>create() // create an ES index wrapper message for `LogLine` (11) .map(WriteMessage::createIndexMessage) // use Alpakka Elasticsearch to create a new `LogLine` record. (12) // takes `ObjectMapper` for `LogLine` for serialization .via(ElasticsearchFlow.create( ElasticsearchParams.V5(indexName, typeName), ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5), JsonMappers.mapper)) .map(writeResult -> { writeResult .getError() .ifPresent(errorJson -> { throw new RuntimeException("Elasticsearch update failed " + writeResult.getErrorReason().orElse(errorJson)); }); return writeResult.message().source().get(); }); } // #es-index-flow // #summarize-log-stats-flow private Flow<LogLine, HashMap<Pair<String, String>, LogFileSummary>, NotUsed> summarizeLogStatsFlow() { return Flow.<LogLine>create() // track statistics per log file (13) .scan(new HashMap<>(), (summaries, logLine) -> { Pair<String, String> key = Pair.create(logLine.directory, logLine.filename); Long timestamp = RunOps.now(); if (summaries.containsKey(key)) { LogFileSummary summary = summaries.get(key); LogFileSummary newSummary = new LogFileSummary(summary.directory, summary.filename, summary.firstSeen, timestamp, logLine.lineNo); summaries.put(key, newSummary); } else { LogFileSummary newSummary = new LogFileSummary(logLine.directory, logLine.filename, timestamp, timestamp, logLine.lineNo); summaries.put(key, newSummary); } return summaries; }); } // #summarize-log-stats-flow // #query-elasticsearch private CompletionStage<List<LogLine>> queryAllRecordsFromElasticsearch(String indexName) { CompletionStage<List<LogLine>> reading = // use Alpakka Elasticsearch to return all entries from the provided index (14) ElasticsearchSource .typed( ElasticsearchParams.V5(indexName, typeName), "{\"match_all\": {}}", ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5), LogLine.class) .map(ReadResult::source) .runWith(Sink.seq(), system); reading.thenAccept(non -> log.info("Reading finished")); return reading; } // #query-elasticsearch private void printResults(List<LogLine> results, Map<Pair<String, String>, LogFileSummary> summaries) { results.stream().forEach(result -> log.debug("Results < {}", result.toString())); String fmt = "%-32s%-32s%-16s%-16s%s"; String header = String.format(fmt, "Directory", "File", "First Seen", "Last Updated", "Number of Lines"); String summariesStr = summaries .values() .stream() .map(s -> String.format(fmt, s.directory, s.filename, s.firstSeen, s.lastUpdated, s.numberOfLines)) .collect(Collectors.joining("\n")); log.info("LogFileSummaries:\n{}\n{}", header, summariesStr); } private CompletionStage<Done> run() throws Exception { this.system = ActorSystem.create(Behaviors.empty(), "FileToElasticSearch"); // #stream-composing // compose stream together starting with the `DirectoryChangesSource` (15) RunnableGraph<Pair<UniqueKillSwitch, CompletionStage<HashMap<Pair<String, String>, LogFileSummary>>>> graph = directoryChangesSource() // create `FileTailSource` sub-streams .via(tailNewLogs()) // merge the sub-streams together and emit all file `LogLine` records downstream .flatMapMerge(streamParallelism, source -> source) // index into Elasticsearch .via(elasticsearchIndexFlow()) // summarize log statistics .via(summarizeLogStatsFlow()) // create a `KillSwitch` so we can shutdown the stream from the outside. use this as the materialized value. .viaMat(KillSwitches.single(), Keep.right()) // materialize the last recorded log stats summarization. // return both a `UniqueKillSwitch` `CompletionStage<Map<Pair<String, String>, LogFileSummary>>` .toMat(Sink.last(), Keep.both()); // #stream-composing // #running-the-app RunOps.deleteAllFilesFrom(inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS); // run the graph and capture the materialized values (16) Pair<UniqueKillSwitch, CompletionStage<HashMap<Pair<String, String>, LogFileSummary>>> running = graph.run(system); UniqueKillSwitch control = running.first(); CompletionStage<HashMap<Pair<String, String>, LogFileSummary>> stream = running.second(); RunOps.copyTestDataTo(testDataPath, inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS); log.info("Running index stream for ", streamRuntime.toString()); Thread.sleep(streamRuntime.toMillis()); log.info("Shutting down index stream"); control.shutdown(); log.info("Wait for index stream to shutdown"); Map<Pair<String, String>, LogFileSummary> summaries = stream.toCompletableFuture().get(10, TimeUnit.SECONDS); // run a new graph to query all records from Elasticsearch and get the results (17) List<LogLine> results = queryAllRecordsFromElasticsearch(indexName).toCompletableFuture().get(10, TimeUnit.SECONDS); printResults(results, summaries); RunOps.deleteAllFilesFrom(inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS); return RunOps.shutdown(system); // #running-the-app } public static void main(String[] args) throws Exception { Main main = new Main(); CompletionStage<Done> run = main.run(); run.toCompletableFuture().get(10, TimeUnit.SECONDS); } }
- Scala
-
source
/* * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com> */ package samples.scaladsl import java.nio.file._ import akka.NotUsed import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.stream.alpakka.elasticsearch._ import akka.stream.alpakka.elasticsearch.scaladsl.{ElasticsearchFlow, ElasticsearchSource} import akka.stream.alpakka.file.DirectoryChange import akka.stream.alpakka.file.scaladsl.{DirectoryChangesSource, FileTailSource} import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source} import akka.stream.{KillSwitches, Materializer, UniqueKillSwitch} import samples.common.DateTimeExtractor import samples.scaladsl.LogFileSummary.LogFileSummaries import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, Future} object Main extends App { import RunOps._ import JsonFormats._ implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "FileToElasticSearch") implicit val executionContext: ExecutionContext = actorSystem.executionContext val streamRuntime = 10.seconds val streamParallelism = 47 val connectionSettings = ElasticsearchConnectionSettings.create(RunOps.elasticsearchAddress) val indexName = "logs" val typeName = "_doc" // #directory-change-source val testDataPath = "./test-data" val inputLogsPath = s"$testDataPath/input" // watch directory (4) val fs = FileSystems.getDefault val directoryChangesSource: Source[(Path, DirectoryChange), NotUsed] = DirectoryChangesSource(fs.getPath(inputLogsPath), pollInterval = 1.second, maxBufferSize = 1000) // source (3) // #directory-change-source // #tail-logs val tailNewLogs: Flow[(Path, DirectoryChange), Source[LogLine, NotUsed], NotUsed] = Flow[(Path, DirectoryChange)] // only watch for file creation events (5) .collect { case (path, DirectoryChange.Creation) => path } .map { path => log.info("File create detected: {}", path) // create a new `FileTailSource` and return it as a sub-stream (6) fileTailSource(path) } def fileTailSource(path: Path): Source[LogLine, NotUsed] = { val filename = path.getFileName.toString val directory = path.getParent.toString // create `FileTailSource` for a given `path` (7) FileTailSource .lines( path = path, maxLineSize = 8192, pollingInterval = 250.millis ) .map { line => log.debug("Parsed > {}", line) line } .scan(LogAcc()) { (acc, line) => // count each line from the log file (8) val lineNo = acc.lineNo + 1 // extract the date timestamp from the log line (9) val date = DateTimeExtractor.extractDate(line) LogAcc(lineNo, // create a `LogLine` record (10) Some(LogLine(line, lineNo, date, filename, directory)) ) } .mapConcat(_.logLine.toList) } // #tail-logs // #es-index-flow val elasticsearchIndexFlow: Flow[LogLine, LogLine, NotUsed] = Flow[LogLine] // create an ES index wrapper message for `LogLine` (11) .map(WriteMessage.createIndexMessage[LogLine]) // use Alpakka Elasticsearch to create a new `LogLine` record. (12) // implicitly takes `JsonFormat` for `LogLine` for serialization .via(ElasticsearchFlow.create(ElasticsearchParams.V5(indexName, typeName), ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5))) .mapConcat { writeResult => writeResult.error.foreach { errorJson => throw new RuntimeException(s"Elasticsearch index failed ${writeResult.errorReason.getOrElse(errorJson)}") } writeResult.message.source.toList } // #es-index-flow // #summarize-log-stats-flow val summarizeLogStatsFlow: Flow[LogLine, LogFileSummaries, NotUsed] = Flow[LogLine] // track statistics per log file (13) .scan(Map[(String, String), LogFileSummary]()) { (summaries, logLine) => val key = (logLine.directory, logLine.filename) val timestamp = now() val summary = summaries .get(key) .map(_.copy(lastUpdated = timestamp, numberOfLines = logLine.lineNo)) .getOrElse(LogFileSummary(logLine.directory, logLine.filename, timestamp, timestamp, logLine.lineNo)) summaries + (key -> summary) } // #summarize-log-stats-flow // #query-elasticsearch def queryAllRecordsFromElasticsearch(indexName: String): Future[immutable.Seq[LogLine]] = { val reading = ElasticsearchSource // use Alpakka Elasticsearch to return all entries from the provided index (14) .typed[LogLine](ElasticsearchParams.V5(indexName, typeName), """{"match_all": {}}""", ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5)) .map(_.source) .runWith(Sink.seq) reading.foreach(_ => log.info("Reading finished")) reading } // #query-elasticsearch def runStreamForAwhileAndShutdown(waitInterval: FiniteDuration, control: UniqueKillSwitch, stream: Future[LogFileSummaries])(implicit mat: Materializer): Future[LogFileSummaries] = { log.info("Running index stream for {}", waitInterval) Thread.sleep(waitInterval.toMillis) log.info("Shutting down index stream") control.shutdown() log.info("Wait for index stream to shutdown") stream } def printResults(results: Seq[LogLine], summaries: LogFileSummaries): Unit = { results.foreach(m => log.debug("Results < {}", m)) val fmt = "%-32s%-32s%-16s%-16s%s" val header = fmt.format("Directory", "File", "First Seen", "Last Updated", "Number of Lines") val summariesStr = summaries.values.map { summary => import summary._ fmt.format(directory, filename, firstSeen, lastUpdated, numberOfLines) }.mkString("\n") log.info("LogFileSummaries:\n{}\n{}", header, summariesStr) } // #stream-composing // compose stream together starting with the `DirectoryChangesSource` (15) val graph: RunnableGraph[(UniqueKillSwitch, Future[LogFileSummaries])] = directoryChangesSource // create `FileTailSource` sub-streams .via(tailNewLogs) // merge the sub-streams together and emit all file `LogLine` records downstream .flatMapMerge(streamParallelism, identity) // index into Elasticsearch .via(elasticsearchIndexFlow) // summarize log statistics .via(summarizeLogStatsFlow) // create a `KillSwitch` so we can shutdown the stream from the outside. use this as the materialized value. .viaMat(KillSwitches.single)(Keep.right) // materialize the last recorded log stats summarization. // return both a `UniqueKillSwitch` `Future[LogFileSummaries]` .toMat(Sink.last)(Keep.both) // #stream-composing // #running-the-app val run = for { _ <- deleteAllFilesFrom(inputLogsPath) // run the graph and capture the materialized values (16) (control, stream) = graph.run() _ <- copyTestDataTo(testDataPath, inputLogsPath) summaries <- runStreamForAwhileAndShutdown(streamRuntime, control, stream) // run a new graph to query all records from Elasticsearch and get the results (17) results <- queryAllRecordsFromElasticsearch(indexName) _ = printResults(results, summaries) _ <- deleteAllFilesFrom(inputLogsPath) _ <- shutdown(actorSystem) } yield () Await.result(run, streamRuntime + 20.seconds) // #running-the-app }
LogLine
- Java
-
source
package samples.javadsl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import java.util.Optional; // #logline // Type in Elasticsearch (2) public class LogLine { public final String line; public final Long lineNo; public final Long date; public final String filename; public final String directory; @JsonCreator public LogLine( @JsonProperty("line") String line, @JsonProperty("lineNo") Long lineNo, @JsonProperty("date") Long date, @JsonProperty("filename") String filename, @JsonProperty("directory") String directory) { this.line = line; this.lineNo = lineNo; this.date = date; this.filename = filename; this.directory = directory; } @Override public java.lang.String toString() { return "LogLine(line=\"" + line + "\", lineNo=" + lineNo.toString() + ", date=" + date.toString() + ", filename=\"" + filename + "\", directory=\"" + directory + "\")"; } } class JsonMappers { // Jackson conversion setup (3) public final static ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule()); public final static ObjectWriter logLineWriter = mapper.writerFor(LogLine.class); public final static ObjectReader logLineReader = mapper.readerFor(LogLine.class); } // #logline class LogAcc { public final Long lineNo; public final Optional<LogLine> logLine; public LogAcc() { this.lineNo = 0L; this.logLine = Optional.empty(); } public LogAcc(Long lineNo, Optional<LogLine> logLine) { this.lineNo = lineNo; this.logLine = logLine; } } class LogFileSummary { public final String directory; public final String filename; public final Long firstSeen; public final Long lastUpdated; public final Long numberOfLines; public LogFileSummary(String directory, String filename, Long firstSeen, Long lastUpdated, Long numberOfLines) { this.directory = directory; this.filename = filename; this.firstSeen = firstSeen; this.lastUpdated = lastUpdated; this.numberOfLines = numberOfLines; } }
- Scala
-
source
package samples.scaladsl import spray.json.DefaultJsonProtocol._ import spray.json._ // #logline // Type in Elasticsearch (2) final case class LogLine(line: String, lineNo: Long, date: Long, filename: String, directory: String) object JsonFormats { // Spray JSON conversion setup (3) implicit val logLineFormat: JsonFormat[LogLine] = jsonFormat5(LogLine) } // #logline final case class LogAcc(lineNo: Long = 0L, logLine: Option[LogLine] = None) object LogFileSummary { type LogFileSummaries = Map[(String, String), LogFileSummary] } final case class LogFileSummary(directory: String, filename: String, firstSeen: Long, lastUpdated: Long, numberOfLines: Long)
RunOps
- Java
-
source
package samples.javadsl import java.nio.file.Path import java.util.concurrent.CompletionStage import akka.Done import akka.actor.typed.ActorSystem import samples.scaladsl import scala.compat.java8.FutureConverters._ object RunOps { val elasticsearchAddress: String = scaladsl.RunOps.elasticsearchAddress def stopContainers(): Unit = scaladsl.RunOps.stopContainers() def now(): Long = scaladsl.RunOps.now() def listFiles(path: String)(implicit system: ActorSystem[_]): CompletionStage[Seq[Path]] = { scaladsl.RunOps.listFiles(path).toJava } def copyTestDataTo(source: String, destination: String)(implicit system: ActorSystem[_]): CompletionStage[Unit] = { scaladsl.RunOps.copyTestDataTo(source, destination).toJava } def deleteAllFilesFrom(path: String)(implicit system: ActorSystem[_]): CompletionStage[Unit] = { scaladsl.RunOps.deleteAllFilesFrom(path).toJava } def shutdown(actorSystem: ActorSystem[_]): CompletionStage[Done] = { scaladsl.RunOps.shutdown(actorSystem).toJava } }
- Scala
-
source
package samples.scaladsl import java.nio.file.{Files, Path, Paths, StandardCopyOption} import java.time.ZonedDateTime import akka.Done import akka.actor.typed.ActorSystem import akka.stream.alpakka.file.scaladsl.Directory import akka.stream.scaladsl.{Keep, Sink} import org.slf4j.LoggerFactory import org.testcontainers.elasticsearch.ElasticsearchContainer import scala.concurrent.Future object RunOps { final val log = LoggerFactory.getLogger(getClass) // Testcontainers: start Elasticsearch in Docker private val elasticsearchContainer = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2") elasticsearchContainer.start() private[samples] val elasticsearchAddress: String = "http://" + elasticsearchContainer.getHttpHostAddress def stopContainers(): Unit = { elasticsearchContainer.stop() } def now(): Long = ZonedDateTime.now.toInstant.toEpochMilli def listFiles(path: String)(implicit system: ActorSystem[_]): Future[Seq[Path]] = Directory.ls(Paths.get(path)).filterNot(Files.isDirectory(_)).toMat(Sink.seq)(Keep.right).run() def copyTestDataTo(source: String, destination: String)(implicit system: ActorSystem[_]): Future[Unit] = { implicit val ec = system.executionContext for { sourceFiles <- listFiles(source) } yield sourceFiles foreach { sourceFile => val destFile = Paths.get(destination, sourceFile.getFileName.toString) log.info(s"Copying file $sourceFile to $destFile") Files.copy(sourceFile, destFile, StandardCopyOption.REPLACE_EXISTING) } } def deleteAllFilesFrom(path: String)(implicit system: ActorSystem[_]): Future[Unit] = { implicit val ec = system.executionContext for { files <- listFiles(path) } yield files filterNot (_.getFileName.toString == ".gitignore") foreach { file => log.info(s"Deleting file: $file") Files.delete(file) } } def shutdown(system: ActorSystem[_]): Future[Done] = { log.info(s"Stop containers") stopContainers() log.info(s"Kill actor system") system.terminate() system.whenTerminated } }
DateTimeExtractors
sourcepackage samples.common
import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneOffset, ZonedDateTime}
import scala.util.matching.Regex
object DateTimeExtractor {
val dateTimeExtractors = List(new ZonedDateTimeExtractor, new LocalDateTimeExtractor)
def extractDate(line: String): Long = {
dateTimeExtractors
.view
.map(_.maybeParse(line))
.collectFirst {
case Some(d) => d
}
.getOrElse(-1L)
}
sealed trait DateTimeExtractor {
def regex: Regex
def parse(dateStr: String): Long
def maybeParse(dateStr: String): Option[Long] = {
val matched: Option[String] = regex.findFirstIn(dateStr)
matched.map(parse)
}
}
/**
* ZonedDateTime
* Ex)
* 2016-01-19T15:21:32.59+02:00
* https://regex101.com/r/LYluKk/4
*/
final class ZonedDateTimeExtractor extends DateTimeExtractor {
val regex: Regex = """((?:(\d{4}-\d{2}-\d{2})[T| ](\d{2}:\d{2}:\d{2}(?:\.\d+)?))(Z|[\+-]\d{2}:\d{2})+)""".r
def parse(dateStr: String): Long = ZonedDateTime.parse(dateStr, DateTimeFormatter.ISO_ZONED_DATE_TIME).toInstant.toEpochMilli
}
/**
* LocalDateTime
* Ex)
* 2019-09-20 21:18:24,774
* https://regex101.com/r/LYluKk/3
*/
final class LocalDateTimeExtractor extends DateTimeExtractor {
private val pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd kk:mm:ss,SSS")
val regex: Regex = """((?:(\d{4}-\d{2}-\d{2})[T| ](\d{2}:\d{2}:\d{2}(?:\.\d+)?)),(\d{3}?))""".r
def parse(dateStr: String): Long = LocalDateTime.parse(dateStr, pattern).toInstant(ZoneOffset.UTC).toEpochMilli
}
}