Full source

The full example.

Main

Java
sourcepackage samples.javadsl;

import akka.Done;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.stream.KillSwitches;
import akka.stream.UniqueKillSwitch;
import akka.stream.alpakka.elasticsearch.*;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchFlow;
import akka.stream.alpakka.elasticsearch.javadsl.ElasticsearchSource;
import akka.stream.alpakka.file.DirectoryChange;
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource;
import akka.stream.alpakka.file.javadsl.FileTailSource;
import akka.stream.javadsl.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import samples.common.DateTimeExtractor;

import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class Main {

    private static final Logger log = LoggerFactory.getLogger(Main.class);

    private ActorSystem<?> system;

    private final ElasticsearchConnectionSettings connectionSettings;

    public Main() {
        connectionSettings = ElasticsearchConnectionSettings.create(RunOps.elasticsearchAddress());
    }

    private final static Duration streamRuntime = Duration.ofSeconds(10L);
    private final static Integer streamParallelism = 47;

    private final static String indexName = "logs";
    private final static String typeName = "_doc";

    // #directory-change-source

    private final static String testDataPath = "./test-data";
    private final static String inputLogsPath = testDataPath + "/input"; // watch directory (4)

    private Source<Pair<Path, DirectoryChange>, NotUsed> directoryChangesSource() {
        final FileSystem fs = FileSystems.getDefault();
        final Duration pollingInterval = Duration.ofSeconds(1);
        final int maxBufferSize = 1000;
        return DirectoryChangesSource.create(fs.getPath(inputLogsPath), pollingInterval, maxBufferSize); // source (3)
    }

    // #directory-change-source

    // #tail-logs

    private Flow<Pair<Path, DirectoryChange>, Source<LogLine, NotUsed>, NotUsed> tailNewLogs() {
        return Flow.<Pair<Path, DirectoryChange>>create()
                // only watch for file creation events (5)
                .filter(pair -> pair.second() == DirectoryChange.Creation)
                .map(pair -> {
                    Path path = pair.first();
                    log.info("File create detected: {}", path.toString());
                    // create a new `FileTailSource` and return it as a sub-stream (6)
                    return fileTailSource(path);
                });
    }

    private Source<LogLine, NotUsed> fileTailSource(Path path) {
        String filename = path.getFileName().toString();
        String directory = path.getParent().toString();

        // create `FileTailSource` for a given `path` (7)
        return FileTailSource
                .createLines(path, 8192, Duration.ofMillis(250))
                .map(line -> {
                    log.debug("Parsed > {}", line);
                    return line;
                })
                .scan(new LogAcc(), (acc, line) -> {
                    // count each line from the log file (8)
                    Long lineNo = acc.lineNo + 1;
                    // extract the date timestamp from the log line (9)
                    Long date = DateTimeExtractor.extractDate(line);
                    // create a `LogLine` record (10)
                    LogLine logLine = new LogLine(line, lineNo, date, filename, directory);
                    return new LogAcc(lineNo, Optional.of(logLine));
                })
                .mapConcat(logAcc -> {
                    if (logAcc.logLine.isPresent()) {
                        return Collections.singletonList(logAcc.logLine.get());
                    } else {
                        return Collections.emptyList();
                    }
                });
    }

    // #tail-logs

    // #es-index-flow

    private Flow<LogLine, LogLine, NotUsed> elasticsearchIndexFlow() {
        return Flow.<LogLine>create()
                // create an ES index wrapper message for `LogLine` (11)
                .map(WriteMessage::createIndexMessage)
                // use Alpakka Elasticsearch to create a new `LogLine` record. (12)
                // takes `ObjectMapper` for `LogLine` for serialization
                .via(ElasticsearchFlow.create(
                        ElasticsearchParams.V5(indexName, typeName),
                        ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5),
                        JsonMappers.mapper))
                .map(writeResult -> {
                    writeResult
                            .getError()
                            .ifPresent(errorJson -> {
                                throw new RuntimeException("Elasticsearch update failed "
                                        + writeResult.getErrorReason().orElse(errorJson));
                            });
                    return writeResult.message().source().get();
                });
    }

    // #es-index-flow

    // #summarize-log-stats-flow

    private Flow<LogLine, HashMap<Pair<String, String>, LogFileSummary>, NotUsed> summarizeLogStatsFlow() {
        return Flow.<LogLine>create()
                // track statistics per log file (13)
                .scan(new HashMap<>(), (summaries, logLine) -> {
                    Pair<String, String> key = Pair.create(logLine.directory, logLine.filename);
                    Long timestamp = RunOps.now();
                    if (summaries.containsKey(key)) {
                        LogFileSummary summary = summaries.get(key);
                        LogFileSummary newSummary = new LogFileSummary(summary.directory, summary.filename, summary.firstSeen, timestamp, logLine.lineNo);
                        summaries.put(key, newSummary);
                    } else {
                        LogFileSummary newSummary = new LogFileSummary(logLine.directory, logLine.filename, timestamp, timestamp, logLine.lineNo);
                        summaries.put(key, newSummary);
                    }
                    return summaries;
                });
    }

    // #summarize-log-stats-flow

    // #query-elasticsearch

    private CompletionStage<List<LogLine>> queryAllRecordsFromElasticsearch(String indexName) {
        CompletionStage<List<LogLine>> reading =
                // use Alpakka Elasticsearch to return all entries from the provided index (14)
                ElasticsearchSource
                        .typed(
                                ElasticsearchParams.V5(indexName, typeName),
                                "{\"match_all\": {}}",
                                ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5),
                                LogLine.class)
                        .map(ReadResult::source)
                        .runWith(Sink.seq(), system);
        reading.thenAccept(non -> log.info("Reading finished"));
        return reading;
    }

    // #query-elasticsearch

    private void printResults(List<LogLine> results, Map<Pair<String, String>, LogFileSummary> summaries) {
        results.stream().forEach(result -> log.debug("Results < {}", result.toString()));

        String fmt = "%-32s%-32s%-16s%-16s%s";
        String header = String.format(fmt, "Directory", "File", "First Seen", "Last Updated", "Number of Lines");
        String summariesStr = summaries
                .values()
                .stream()
                .map(s -> String.format(fmt, s.directory, s.filename, s.firstSeen, s.lastUpdated, s.numberOfLines))
                .collect(Collectors.joining("\n"));

        log.info("LogFileSummaries:\n{}\n{}", header, summariesStr);
    }

    private CompletionStage<Done> run() throws Exception {
        this.system = ActorSystem.create(Behaviors.empty(), "FileToElasticSearch");
        // #stream-composing

        // compose stream together starting with the `DirectoryChangesSource` (15)
        RunnableGraph<Pair<UniqueKillSwitch, CompletionStage<HashMap<Pair<String, String>, LogFileSummary>>>> graph = directoryChangesSource()
                // create `FileTailSource` sub-streams
                .via(tailNewLogs())
                // merge the sub-streams together and emit all file `LogLine` records downstream
                .flatMapMerge(streamParallelism, source -> source)
                // index into Elasticsearch
                .via(elasticsearchIndexFlow())
                // summarize log statistics
                .via(summarizeLogStatsFlow())
                // create a `KillSwitch` so we can shutdown the stream from the outside. use this as the materialized value.
                .viaMat(KillSwitches.single(), Keep.right())
                // materialize the last recorded log stats summarization.
                // return both a `UniqueKillSwitch` `CompletionStage<Map<Pair<String, String>, LogFileSummary>>`
                .toMat(Sink.last(), Keep.both());

        // #stream-composing

        // #running-the-app

        RunOps.deleteAllFilesFrom(inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS);

        // run the graph and capture the materialized values (16)
        Pair<UniqueKillSwitch, CompletionStage<HashMap<Pair<String, String>, LogFileSummary>>> running = graph.run(system);
        UniqueKillSwitch control = running.first();
        CompletionStage<HashMap<Pair<String, String>, LogFileSummary>> stream = running.second();

        RunOps.copyTestDataTo(testDataPath, inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS);

        log.info("Running index stream for ", streamRuntime.toString());
        Thread.sleep(streamRuntime.toMillis());
        log.info("Shutting down index stream");
        control.shutdown();
        log.info("Wait for index stream to shutdown");
        Map<Pair<String, String>, LogFileSummary> summaries = stream.toCompletableFuture().get(10, TimeUnit.SECONDS);

        // run a new graph to query all records from Elasticsearch and get the results (17)
        List<LogLine> results = queryAllRecordsFromElasticsearch(indexName).toCompletableFuture().get(10, TimeUnit.SECONDS);

        printResults(results, summaries);

        RunOps.deleteAllFilesFrom(inputLogsPath, system).toCompletableFuture().get(10, TimeUnit.SECONDS);

        return RunOps.shutdown(system);

        // #running-the-app
    }

    public static void main(String[] args) throws Exception {
        Main main = new Main();
        CompletionStage<Done> run = main.run();

        run.toCompletableFuture().get(10, TimeUnit.SECONDS);
    }
}
Scala
source/*
 * Copyright (C) 2016-2024 Lightbend Inc. <https://www.lightbend.com>
 */

package samples.scaladsl

import java.nio.file._
import akka.NotUsed
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.elasticsearch.scaladsl.{ElasticsearchFlow, ElasticsearchSource}
import akka.stream.alpakka.file.DirectoryChange
import akka.stream.alpakka.file.scaladsl.{DirectoryChangesSource, FileTailSource}
import akka.stream.scaladsl.{Flow, Keep, RunnableGraph, Sink, Source}
import akka.stream.{KillSwitches, Materializer, UniqueKillSwitch}
import samples.common.DateTimeExtractor
import samples.scaladsl.LogFileSummary.LogFileSummaries

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}

object Main extends App {

  import RunOps._
  import JsonFormats._

  implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "FileToElasticSearch")
  implicit val executionContext: ExecutionContext = actorSystem.executionContext

  val streamRuntime = 10.seconds
  val streamParallelism = 47

  val connectionSettings = ElasticsearchConnectionSettings.create(RunOps.elasticsearchAddress)

  val indexName = "logs"
  val typeName = "_doc"

  // #directory-change-source

  val testDataPath = "./test-data"
  val inputLogsPath = s"$testDataPath/input" // watch directory (4)

  val fs = FileSystems.getDefault
  val directoryChangesSource: Source[(Path, DirectoryChange), NotUsed] =
    DirectoryChangesSource(fs.getPath(inputLogsPath), pollInterval = 1.second, maxBufferSize = 1000) // source (3)

  // #directory-change-source

  // #tail-logs

  val tailNewLogs: Flow[(Path, DirectoryChange), Source[LogLine, NotUsed], NotUsed] =
    Flow[(Path, DirectoryChange)]
      // only watch for file creation events (5)
      .collect { case (path, DirectoryChange.Creation) => path }
      .map { path =>
        log.info("File create detected: {}", path)
        // create a new `FileTailSource` and return it as a sub-stream (6)
        fileTailSource(path)
      }

  def fileTailSource(path: Path): Source[LogLine, NotUsed] = {
    val filename = path.getFileName.toString
    val directory = path.getParent.toString

    // create `FileTailSource` for a given `path` (7)
    FileTailSource
      .lines(
        path = path,
        maxLineSize = 8192,
        pollingInterval = 250.millis
      )
      .map { line =>
        log.debug("Parsed > {}", line)
        line
      }
      .scan(LogAcc()) { (acc, line) =>
        // count each line from the log file (8)
        val lineNo = acc.lineNo + 1
        // extract the date timestamp from the log line (9)
        val date = DateTimeExtractor.extractDate(line)
        LogAcc(lineNo,
          // create a `LogLine` record (10)
          Some(LogLine(line, lineNo, date, filename, directory))
        )
      }
      .mapConcat(_.logLine.toList)
  }

  // #tail-logs

  // #es-index-flow

  val elasticsearchIndexFlow: Flow[LogLine, LogLine, NotUsed] =
    Flow[LogLine]
      // create an ES index wrapper message for `LogLine` (11)
      .map(WriteMessage.createIndexMessage[LogLine])
      // use Alpakka Elasticsearch to create a new `LogLine` record. (12)
      // implicitly takes `JsonFormat` for `LogLine` for serialization
      .via(ElasticsearchFlow.create(ElasticsearchParams.V5(indexName, typeName),
        ElasticsearchWriteSettings.create(connectionSettings).withApiVersion(ApiVersion.V5)))
      .mapConcat { writeResult =>
        writeResult.error.foreach { errorJson =>
          throw new RuntimeException(s"Elasticsearch index failed ${writeResult.errorReason.getOrElse(errorJson)}")
        }
        writeResult.message.source.toList
      }

  // #es-index-flow

  // #summarize-log-stats-flow

  val summarizeLogStatsFlow: Flow[LogLine, LogFileSummaries, NotUsed] =
    Flow[LogLine]
      // track statistics per log file (13)
      .scan(Map[(String, String), LogFileSummary]()) { (summaries, logLine) =>
        val key = (logLine.directory, logLine.filename)
        val timestamp = now()
        val summary = summaries
          .get(key)
          .map(_.copy(lastUpdated = timestamp, numberOfLines = logLine.lineNo))
          .getOrElse(LogFileSummary(logLine.directory, logLine.filename, timestamp, timestamp, logLine.lineNo))
        summaries + (key -> summary)
      }

  // #summarize-log-stats-flow

  // #query-elasticsearch

  def queryAllRecordsFromElasticsearch(indexName: String): Future[immutable.Seq[LogLine]] = {
    val reading = ElasticsearchSource
      // use Alpakka Elasticsearch to return all entries from the provided index (14)
      .typed[LogLine](ElasticsearchParams.V5(indexName, typeName), """{"match_all": {}}""",
        ElasticsearchSourceSettings.create(connectionSettings).withApiVersion(ApiVersion.V5))
      .map(_.source)
      .runWith(Sink.seq)
    reading.foreach(_ => log.info("Reading finished"))
    reading
  }

  // #query-elasticsearch

  def runStreamForAwhileAndShutdown(waitInterval: FiniteDuration,
                                    control: UniqueKillSwitch,
                                    stream: Future[LogFileSummaries])(implicit mat: Materializer): Future[LogFileSummaries] = {
    log.info("Running index stream for {}", waitInterval)
    Thread.sleep(waitInterval.toMillis)
    log.info("Shutting down index stream")
    control.shutdown()
    log.info("Wait for index stream to shutdown")
    stream
  }

  def printResults(results: Seq[LogLine], summaries: LogFileSummaries): Unit = {
    results.foreach(m => log.debug("Results < {}", m))

    val fmt = "%-32s%-32s%-16s%-16s%s"
    val header = fmt.format("Directory", "File", "First Seen", "Last Updated", "Number of Lines")
    val summariesStr = summaries.values.map { summary =>
      import summary._
      fmt.format(directory, filename, firstSeen, lastUpdated, numberOfLines)
    }.mkString("\n")

    log.info("LogFileSummaries:\n{}\n{}", header, summariesStr)
  }

  // #stream-composing

  // compose stream together starting with the `DirectoryChangesSource` (15)
  val graph: RunnableGraph[(UniqueKillSwitch, Future[LogFileSummaries])] = directoryChangesSource
    // create `FileTailSource` sub-streams
    .via(tailNewLogs)
    // merge the sub-streams together and emit all file `LogLine` records downstream
    .flatMapMerge(streamParallelism, identity)
    // index into Elasticsearch
    .via(elasticsearchIndexFlow)
    // summarize log statistics
    .via(summarizeLogStatsFlow)
    // create a `KillSwitch` so we can shutdown the stream from the outside. use this as the materialized value.
    .viaMat(KillSwitches.single)(Keep.right)
    // materialize the last recorded log stats summarization.
    // return both a `UniqueKillSwitch` `Future[LogFileSummaries]`
    .toMat(Sink.last)(Keep.both)

  // #stream-composing

  // #running-the-app

  val run = for {
    _ <-                deleteAllFilesFrom(inputLogsPath)
    // run the graph and capture the materialized values (16)
    (control, stream) = graph.run()
    _ <-                copyTestDataTo(testDataPath, inputLogsPath)
    summaries <-        runStreamForAwhileAndShutdown(streamRuntime, control, stream)
    // run a new graph to query all records from Elasticsearch and get the results (17)
    results <-          queryAllRecordsFromElasticsearch(indexName)
    _ =                 printResults(results, summaries)
    _ <-                deleteAllFilesFrom(inputLogsPath)
    _ <-                shutdown(actorSystem)
  } yield ()

  Await.result(run, streamRuntime + 20.seconds)

  // #running-the-app
}

LogLine

Java
sourcepackage samples.javadsl;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import java.util.Optional;

// #logline

// Type in Elasticsearch (2)
public class LogLine {
    public final String line;
    public final Long lineNo;
    public final Long date;
    public final String filename;
    public final String directory;

    @JsonCreator
    public LogLine(
            @JsonProperty("line") String line,
            @JsonProperty("lineNo") Long lineNo,
            @JsonProperty("date") Long date,
            @JsonProperty("filename") String filename,
            @JsonProperty("directory") String directory) {
        this.line = line;
        this.lineNo = lineNo;
        this.date = date;
        this.filename = filename;
        this.directory = directory;
    }

    @Override
    public java.lang.String toString() {
        return "LogLine(line=\"" + line + "\", lineNo=" + lineNo.toString() + ", date=" + date.toString() +
                ", filename=\"" + filename + "\", directory=\"" + directory + "\")";
    }
}

class JsonMappers {
    // Jackson conversion setup (3)
    public final static ObjectMapper mapper = new ObjectMapper().registerModule(new JavaTimeModule());
    public final static ObjectWriter logLineWriter = mapper.writerFor(LogLine.class);
    public final static ObjectReader logLineReader = mapper.readerFor(LogLine.class);
}

// #logline

class LogAcc {
    public final Long lineNo;
    public final Optional<LogLine> logLine;

    public LogAcc() {
        this.lineNo = 0L;
        this.logLine = Optional.empty();
    }

    public LogAcc(Long lineNo, Optional<LogLine> logLine) {
        this.lineNo = lineNo;
        this.logLine = logLine;
    }
}

class LogFileSummary {
    public final String directory;
    public final String filename;
    public final Long firstSeen;
    public final Long lastUpdated;
    public final Long numberOfLines;

    public LogFileSummary(String directory, String filename, Long firstSeen, Long lastUpdated, Long numberOfLines) {
        this.directory = directory;
        this.filename = filename;
        this.firstSeen = firstSeen;
        this.lastUpdated = lastUpdated;
        this.numberOfLines = numberOfLines;
    }
}
Scala
sourcepackage samples.scaladsl

import spray.json.DefaultJsonProtocol._
import spray.json._

// #logline

// Type in Elasticsearch (2)
final case class LogLine(line: String, lineNo: Long, date: Long, filename: String, directory: String)

object JsonFormats {
  // Spray JSON conversion setup (3)
  implicit val logLineFormat: JsonFormat[LogLine] = jsonFormat5(LogLine)
}

// #logline

final case class LogAcc(lineNo: Long = 0L, logLine: Option[LogLine] = None)

object LogFileSummary {
  type LogFileSummaries = Map[(String, String), LogFileSummary]
}

final case class LogFileSummary(directory: String, filename: String, firstSeen: Long, lastUpdated: Long, numberOfLines: Long)

RunOps

Java
sourcepackage samples.javadsl
import java.nio.file.Path
import java.util.concurrent.CompletionStage

import akka.Done
import akka.actor.typed.ActorSystem
import samples.scaladsl

import scala.compat.java8.FutureConverters._

object RunOps {
  val elasticsearchAddress: String = scaladsl.RunOps.elasticsearchAddress

  def stopContainers(): Unit = scaladsl.RunOps.stopContainers()

  def now(): Long = scaladsl.RunOps.now()

  def listFiles(path: String)(implicit system: ActorSystem[_]): CompletionStage[Seq[Path]] = {
    scaladsl.RunOps.listFiles(path).toJava
  }

  def copyTestDataTo(source: String, destination: String)(implicit system: ActorSystem[_]): CompletionStage[Unit] = {
    scaladsl.RunOps.copyTestDataTo(source, destination).toJava
  }

  def deleteAllFilesFrom(path: String)(implicit system: ActorSystem[_]): CompletionStage[Unit] = {
    scaladsl.RunOps.deleteAllFilesFrom(path).toJava
  }

  def shutdown(actorSystem: ActorSystem[_]): CompletionStage[Done] = {
    scaladsl.RunOps.shutdown(actorSystem).toJava
  }
}
Scala
sourcepackage samples.scaladsl

import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.time.ZonedDateTime
import akka.Done
import akka.actor.typed.ActorSystem
import akka.stream.alpakka.file.scaladsl.Directory
import akka.stream.scaladsl.{Keep, Sink}
import org.slf4j.LoggerFactory
import org.testcontainers.elasticsearch.ElasticsearchContainer

import scala.concurrent.Future

object RunOps {
  final val log = LoggerFactory.getLogger(getClass)

  // Testcontainers: start Elasticsearch in Docker
  private val elasticsearchContainer = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch-oss:7.10.2")
  elasticsearchContainer.start()
  private[samples] val elasticsearchAddress: String = "http://" + elasticsearchContainer.getHttpHostAddress

  def stopContainers(): Unit = {
    elasticsearchContainer.stop()
  }

  def now(): Long = ZonedDateTime.now.toInstant.toEpochMilli

  def listFiles(path: String)(implicit system: ActorSystem[_]): Future[Seq[Path]] =
    Directory.ls(Paths.get(path)).filterNot(Files.isDirectory(_)).toMat(Sink.seq)(Keep.right).run()

  def copyTestDataTo(source: String, destination: String)(implicit system: ActorSystem[_]): Future[Unit] = {
    implicit val ec = system.executionContext
    for {
      sourceFiles <- listFiles(source)
    } yield sourceFiles foreach { sourceFile =>
      val destFile = Paths.get(destination, sourceFile.getFileName.toString)
      log.info(s"Copying file $sourceFile to $destFile")
      Files.copy(sourceFile, destFile, StandardCopyOption.REPLACE_EXISTING)
    }
  }

  def deleteAllFilesFrom(path: String)(implicit system: ActorSystem[_]): Future[Unit] = {
    implicit val ec = system.executionContext
    for {
      files <- listFiles(path)
    } yield files filterNot (_.getFileName.toString == ".gitignore") foreach { file =>
      log.info(s"Deleting file: $file")
      Files.delete(file)
    }
  }

  def shutdown(system: ActorSystem[_]): Future[Done] = {
    log.info(s"Stop containers")
    stopContainers()
    log.info(s"Kill actor system")
    system.terminate()
    system.whenTerminated
  }
}

DateTimeExtractors

sourcepackage samples.common

import java.time.format.DateTimeFormatter
import java.time.{LocalDateTime, ZoneOffset, ZonedDateTime}

import scala.util.matching.Regex

object DateTimeExtractor {
  val dateTimeExtractors = List(new ZonedDateTimeExtractor, new LocalDateTimeExtractor)

  def extractDate(line: String): Long = {
    dateTimeExtractors
      .view
      .map(_.maybeParse(line))
      .collectFirst {
        case Some(d) => d
      }
      .getOrElse(-1L)
  }

  sealed trait DateTimeExtractor {
    def regex: Regex
    def parse(dateStr: String): Long
    def maybeParse(dateStr: String): Option[Long] = {
      val matched: Option[String] = regex.findFirstIn(dateStr)
      matched.map(parse)
    }
  }

  /**
    * ZonedDateTime
    * Ex)
    * 2016-01-19T15:21:32.59+02:00
    * https://regex101.com/r/LYluKk/4
    */
  final class ZonedDateTimeExtractor extends DateTimeExtractor {
    val regex: Regex = """((?:(\d{4}-\d{2}-\d{2})[T| ](\d{2}:\d{2}:\d{2}(?:\.\d+)?))(Z|[\+-]\d{2}:\d{2})+)""".r
    def parse(dateStr: String): Long = ZonedDateTime.parse(dateStr, DateTimeFormatter.ISO_ZONED_DATE_TIME).toInstant.toEpochMilli
  }

  /**
    * LocalDateTime
    * Ex)
    * 2019-09-20 21:18:24,774
    * https://regex101.com/r/LYluKk/3
    */
  final class LocalDateTimeExtractor extends DateTimeExtractor {
    private val pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd kk:mm:ss,SSS")
    val regex: Regex = """((?:(\d{4}-\d{2}-\d{2})[T| ](\d{2}:\d{2}:\d{2}(?:\.\d+)?)),(\d{3}?))""".r
    def parse(dateStr: String): Long = LocalDateTime.parse(dateStr, pattern).toInstant(ZoneOffset.UTC).toEpochMilli
  }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.