Full source

The full example contains code to run an embedded Apache FTP server with a virtual file system.

Java
sourcepackage samples.javadsl;

// #imports
import java.net.InetAddress;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.FileSystem;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletionStage;

import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.japi.function.Creator;
import akka.japi.function.Function;
import akka.stream.IOResult;
import akka.stream.alpakka.file.javadsl.Directory;
import akka.stream.alpakka.file.javadsl.LogRotatorSink;
import akka.stream.alpakka.ftp.javadsl.Sftp;

import akka.stream.alpakka.ftp.FtpCredentials;
import akka.stream.alpakka.ftp.SftpIdentity;
import akka.stream.alpakka.ftp.KeyFileSftpIdentity;
import akka.stream.alpakka.ftp.SftpSettings;
import akka.stream.javadsl.Compression;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.util.ByteString;

import org.apache.mina.util.AvailablePortFinder;
import playground.filesystem.FileSystemMock;
import playground.SftpServerEmbedded;

import akka.stream.javadsl.Sink;
// #imports

public class Main {

    private void run() throws IOException {
        final ActorSystem<Void> actorSystem = ActorSystem.create(Behaviors.empty(), "RotateLogsToFtp");

        final FileSystem ftpFileSystem = new FileSystemMock().fileSystem;

        final String privateKeyPassphrase = new String(SftpServerEmbedded.clientPrivateKeyPassphrase());
        final String pathToIdentityFile = SftpServerEmbedded.clientPrivateKeyFile();
        final String username = "username";
        final String password = username;
        final String hostname = "localhost";

        int port = AvailablePortFinder.getNextAvailable(21_000);

        Path home = ftpFileSystem.getPath(SftpServerEmbedded.FtpRootDir()).resolve("tmp");
        if (!Files.exists(home)) Files.createDirectories(home);

        SftpServerEmbedded.start(ftpFileSystem, port);

        // #sample
        Iterator<ByteString> data =
                Arrays.asList('a', 'b', 'c', 'd').stream()
                        .map(
                                e -> {
                                    char[] arr = new char[100];
                                    Arrays.fill(arr, e);
                                    return ByteString.fromString(String.valueOf(arr));
                                })
                        .iterator();

        // (2)
        Creator<Function<ByteString, Optional<String>>> rotator =
                () -> {
                    final char[] last = {' '};
                    return (bs) -> {
                        char c = (char) bs.head();
                        if (c != last[0]) {
                            last[0] = c;
                            return Optional.of("log-" + c + ".z");
                        } else {
                            return Optional.empty();
                        }
                    };
                };

        // (3)
        KeyFileSftpIdentity identity =
                SftpIdentity.createFileSftpIdentity(pathToIdentityFile, privateKeyPassphrase.getBytes());
        SftpSettings settings =
                SftpSettings.create(InetAddress.getByName(hostname))
                        .withPort(port)
                        .withSftpIdentity(identity)
                        .withStrictHostKeyChecking(false)
                        .withCredentials(FtpCredentials.create(username, password));

        Function<String, Sink<ByteString, CompletionStage<IOResult>>> sink =
                path -> {
                    Sink<ByteString, CompletionStage<IOResult>> ftpSink = Sftp.toPath("tmp/" + path, settings);
                    return Flow.<ByteString>create()
                            .via(Compression.gzip()) // (4)
                            .toMat(ftpSink, Keep.right());
                };

        CompletionStage<Done> completion =
                Source.fromIterator(() -> data)
                        .runWith(LogRotatorSink.withSinkFactory(rotator, sink), actorSystem);
        // #sample

        completion
                .thenApply(
                        (i) ->
                                Directory.ls(home)
                                        .runForeach((f) -> System.out.println(f.toString()), actorSystem))
                .whenComplete(
                        (res, ex) -> {
                            if (ex != null) {
                                ex.printStackTrace();
                            }
                            actorSystem.terminate();
                            actorSystem.getWhenTerminated().thenAccept(t -> SftpServerEmbedded.stopServer());
                        });
    }

    public static void main(String[] args) throws IOException {
        new Main().run();
    }
}
Scala
sourcepackage samples.scaladsl

// #imports

import java.net.InetAddress
import java.nio.file.{Files, Path}

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.stream.alpakka.file.scaladsl.{Directory, LogRotatorSink}
import akka.stream.alpakka.ftp.scaladsl.Sftp
import akka.stream.alpakka.ftp.{FtpCredentials, SftpIdentity, SftpSettings}
import akka.stream.scaladsl.{Compression, Flow, Keep, Source}
import akka.util.ByteString
import org.apache.mina.util.AvailablePortFinder
import playground.SftpServerEmbedded
import playground.filesystem.FileSystemMock

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext}
// #imports

object Main extends App {
  implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "RotateLogsToFtp")
  implicit val executionContext: ExecutionContext = actorSystem.executionContext

  def wait(duration: FiniteDuration): Unit = Thread.sleep(duration.toMillis)


  private val ftpFileSystem = new FileSystemMock().fileSystem
  private val privateKeyPassphrase = SftpServerEmbedded.clientPrivateKeyPassphrase
  private val pathToIdentityFile = SftpServerEmbedded.clientPrivateKeyFile
  private val username = "username"
  private val password = username
  private val hostname = "localhost"
  val port = AvailablePortFinder.getNextAvailable(21000)

  val home: Path = ftpFileSystem.getPath(SftpServerEmbedded.FtpRootDir).resolve("tmp")
  if (!Files.exists(home)) Files.createDirectories(home)

  SftpServerEmbedded.start(ftpFileSystem, port)

  // #sample
  val data = ('a' to 'd') // (1)
    .flatMap(letter => Seq.fill(10)(ByteString(letter.toString * 10000)))

  // (2)
  val rotator = () => {
    var last: Char = ' '
    (bs: ByteString) => {
      bs.head.toChar match {
        case char if char != last =>
          last = char
          Some(s"log-$char.z")
        case _ => None
      }
    }
  }

  // (3)
  val identity = SftpIdentity.createFileSftpIdentity(pathToIdentityFile, privateKeyPassphrase)
  val credentials = FtpCredentials.create(username, password)
  val settings = SftpSettings(InetAddress.getByName(hostname))
    .withPort(port)
    .withSftpIdentity(identity)
    .withStrictHostKeyChecking(false)
    .withCredentials(credentials)

  val sink = (path: String) =>
    Flow[ByteString]
      .via(Compression.gzip) // (4)
      .toMat(Sftp.toPath(s"tmp/$path", settings))(Keep.right)

  val completion = Source(data).runWith(LogRotatorSink.withSinkFactory(rotator, sink))
  // #sample

  completion
    .flatMap { _ =>
      Directory
        .ls(home)
        .runForeach(f => println(f))
    }
    .recover {
      case f =>
        f.printStackTrace()
    }
    .onComplete { _ =>
      actorSystem.terminate()
      actorSystem.getWhenTerminated.thenAccept(_ => SftpServerEmbedded.stopServer());
    }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.