Alpakka sample

Download all files an FTP server to local files

This example uses Alpakka FTP to read from the FTP server, and stores files using Akka Stream FileIO.

Browse the sources at Github.

To try out this project clone the Alpakka Samples repository and find it in the alpakka-sample-ftp-to-file directory.

Dependencies

Dependencies (sbt notation)
sourceval AkkaVersion = "2.7.0"
val AlpakkaVersion = "6.0.1"
val AkkaDiagnosticsVersion = "2.0.0-M4"

  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % AlpakkaVersion,
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-actor" % AkkaVersion,
  "com.lightbend.akka" %% "akka-diagnostics" % AkkaDiagnosticsVersion,

Imports

Java
sourceimport akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Sink;
Scala
sourceimport akka.stream.alpakka.ftp.FtpSettings
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.{FileIO, Sink}
import akka.stream.IOResult

Flow

  • list FTP server contents (1),
  • just bother about file entries (2),
  • for each file prepare for awaiting FutureCompletionStage results ignoring the stream order (3),
  • run a new stream copying the file contents to a local file (4),
  • combine the filename and the copying result (5),
  • collect all filenames with results into a sequence (6)
Java
sourcefinal FtpSettings ftpSettings =
        FtpSettings.create(InetAddress.getByName("localhost")).withPort(port);
final int parallelism = 5;

final CompletionStage<List<Pair<String, IOResult>>> fetchedFiles =
        Ftp.ls("/", ftpSettings) // : FtpFile (1)
                .filter(ftpFile -> ftpFile.isFile()) // : FtpFile (2)
                .mapAsyncUnordered(
                        parallelism,
                        ftpFile -> { // (3)
                            final Path localPath = targetDir.resolve("." + ftpFile.path());
                            final CompletionStage<IOResult> fetchFile =
                                    Ftp.fromPath(ftpFile.path(), ftpSettings)
                                            .runWith(FileIO.toPath(localPath), system); // (4)
                            return fetchFile.thenApply(
                                    ioResult -> // (5)
                                            Pair.create(ftpFile.path(), ioResult));
                        }) // : (String, IOResult)
                .runWith(Sink.seq(), system); // (6)
Scala
sourceval ftpSettings = FtpSettings(InetAddress.getByName("localhost")).withPort(port)

val fetchedFiles: Future[immutable.Seq[(String, IOResult)]] =
  Ftp
    .ls("/", ftpSettings)                                    //: FtpFile (1)
    .filter(ftpFile => ftpFile.isFile)                       //: FtpFile (2)
    .mapAsyncUnordered(parallelism = 5) { ftpFile =>         // (3)
      val localPath = targetDir.resolve("." + ftpFile.path)
      val fetchFile: Future[IOResult] = Ftp
        .fromPath(ftpFile.path, ftpSettings)
        .runWith(FileIO.toPath(localPath))                   // (4)
      fetchFile.map { ioResult =>                            // (5)
        (ftpFile.path, ioResult)
      }
    }                                                        //: (String, IOResult)
    .runWith(Sink.seq)                                       // (6)

All Alpakka samples

Show Alpakka samples listing.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.