Alpakka sample

Rotate data stream over to multiple compressed files on SFTP server

This example reads a stream of data and uses Alpakka File LogRotatorSink to write multiple files which get rotated triggered by a rotation function, the files are zipped in-flow and written to an SFTP server with Alpakka FTP.

Browse the sources at Github.

To try out this project clone the Alpakka Samples repository and find it in the alpakka-sample-rotate-logs-to-ftp directory.

Dependencies

Dependencies (sbt notation)
sourceval AkkaVersion = "2.9.0"
val AlpakkaVersion = "7.0.2"
val AkkaDiagnosticsVersion = "2.1.0"

  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor" % AkkaVersion,
  "com.lightbend.akka" %% "akka-stream-alpakka-file" % AlpakkaVersion,
  "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % AlpakkaVersion,
  "com.lightbend.akka" %% "akka-diagnostics" % AkkaDiagnosticsVersion,

Imports

Java
sourceimport java.net.InetAddress;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.FileSystem;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.japi.function.Creator;
import akka.japi.function.Function;
import akka.stream.IOResult;
import akka.stream.alpakka.file.javadsl.Directory;
import akka.stream.alpakka.file.javadsl.LogRotatorSink;
import akka.stream.alpakka.ftp.javadsl.Sftp;

import akka.stream.alpakka.ftp.FtpCredentials;
import akka.stream.alpakka.ftp.SftpIdentity;
import akka.stream.alpakka.ftp.KeyFileSftpIdentity;
import akka.stream.alpakka.ftp.SftpSettings;
import akka.stream.javadsl.Compression;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import org.apache.mina.util.AvailablePortFinder;
import playground.filesystem.FileSystemMock;
import playground.SftpServerEmbedded;

import akka.stream.javadsl.Sink;
Scala
source
import java.net.InetAddress import java.nio.file.{Files, Path} import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.Behaviors import akka.stream.alpakka.file.scaladsl.{Directory, LogRotatorSink} import akka.stream.alpakka.ftp.scaladsl.Sftp import akka.stream.alpakka.ftp.{FtpCredentials, SftpIdentity, SftpSettings} import akka.stream.scaladsl.{Compression, Flow, Keep, Source} import akka.util.ByteString import org.apache.mina.util.AvailablePortFinder import playground.SftpServerEmbedded import playground.filesystem.FileSystemMock import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext}
  • generate data stream with changing contents over time (1),
  • function that tracks last element and outputs a new path when contents in the stream change (2),
  • prepare SFTP credentials and settings (3),
  • compress ByteStrings (4)
Scala
sourceval data = ('a' to 'd') // (1)
  .flatMap(letter => Seq.fill(10)(ByteString(letter.toString * 10000)))

// (2)
val rotator = () => {
  var last: Char = ' '
  (bs: ByteString) => {
    bs.head.toChar match {
      case char if char != last =>
        last = char
        Some(s"log-$char.z")
      case _ => None
    }
  }
}

// (3)
val identity = SftpIdentity.createFileSftpIdentity(pathToIdentityFile, privateKeyPassphrase)
val credentials = FtpCredentials.create(username, password)
val settings = SftpSettings(InetAddress.getByName(hostname))
  .withPort(port)
  .withSftpIdentity(identity)
  .withStrictHostKeyChecking(false)
  .withCredentials(credentials)

val sink = (path: String) =>
  Flow[ByteString]
    .via(Compression.gzip) // (4)
    .toMat(Sftp.toPath(s"tmp/$path", settings))(Keep.right)

val completion = Source(data).runWith(LogRotatorSink.withSinkFactory(rotator, sink))
Java
sourceIterator<ByteString> data =
        Arrays.asList('a', 'b', 'c', 'd').stream()
                .map(
                        e -> {
                            char[] arr = new char[100];
                            Arrays.fill(arr, e);
                            return ByteString.fromString(String.valueOf(arr));
                        })
                .iterator();

// (2)
Creator<Function<ByteString, Optional<String>>> rotator =
        () -> {
            final char[] last = {' '};
            return (bs) -> {
                char c = (char) bs.head();
                if (c != last[0]) {
                    last[0] = c;
                    return Optional.of("log-" + c + ".z");
                } else {
                    return Optional.empty();
                }
            };
        };

// (3)
KeyFileSftpIdentity identity =
        SftpIdentity.createFileSftpIdentity(pathToIdentityFile, privateKeyPassphrase.getBytes());
SftpSettings settings =
        SftpSettings.create(InetAddress.getByName(hostname))
                .withPort(port)
                .withSftpIdentity(identity)
                .withStrictHostKeyChecking(false)
                .withCredentials(FtpCredentials.create(username, password));

Function<String, Sink<ByteString, CompletionStage<IOResult>>> sink =
        path -> {
            Sink<ByteString, CompletionStage<IOResult>> ftpSink = Sftp.toPath("tmp/" + path, settings);
            return Flow.<ByteString>create()
                    .via(Compression.gzip()) // (4)
                    .toMat(ftpSink, Keep.right());
        };

CompletionStage<Done> completion =
        Source.fromIterator(() -> data)
                .runWith(LogRotatorSink.withSinkFactory(rotator, sink), actorSystem);

All Alpakka samples

Show Alpakka samples listing.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.