Alpakka sample
Download all files an FTP server to local files
This example uses Alpakka FTP to read from the FTP server, and stores files using Akka Stream FileIO
.
Browse the sources at Github.
To try out this project clone the Alpakka Samples repository and find it in the alpakka-sample-ftp-to-file
directory.
Dependencies
- Dependencies (sbt notation)
-
source
val AkkaVersion = "2.7.0" val AlpakkaVersion = "6.0.1" val AkkaDiagnosticsVersion = "2.0.0-M4" "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % AlpakkaVersion, "com.typesafe.akka" %% "akka-stream" % AkkaVersion, "com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion, "com.typesafe.akka" %% "akka-actor" % AkkaVersion, "com.lightbend.akka" %% "akka-diagnostics" % AkkaDiagnosticsVersion,
Imports
- Java
-
source
import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Sink;
- Scala
-
source
import akka.stream.alpakka.ftp.FtpSettings import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.{FileIO, Sink} import akka.stream.IOResult
Flow
- list FTP server contents (1),
- just bother about file entries (2),
- for each file prepare for awaiting
Future
CompletionStage
results ignoring the stream order (3), - run a new stream copying the file contents to a local file (4),
- combine the filename and the copying result (5),
- collect all filenames with results into a sequence (6)
- Java
-
source
final FtpSettings ftpSettings = FtpSettings.create(InetAddress.getByName("localhost")).withPort(port); final int parallelism = 5; final CompletionStage<List<Pair<String, IOResult>>> fetchedFiles = Ftp.ls("/", ftpSettings) // : FtpFile (1) .filter(ftpFile -> ftpFile.isFile()) // : FtpFile (2) .mapAsyncUnordered( parallelism, ftpFile -> { // (3) final Path localPath = targetDir.resolve("." + ftpFile.path()); final CompletionStage<IOResult> fetchFile = Ftp.fromPath(ftpFile.path(), ftpSettings) .runWith(FileIO.toPath(localPath), system); // (4) return fetchFile.thenApply( ioResult -> // (5) Pair.create(ftpFile.path(), ioResult)); }) // : (String, IOResult) .runWith(Sink.seq(), system); // (6)
- Scala
-
source
val ftpSettings = FtpSettings(InetAddress.getByName("localhost")).withPort(port) val fetchedFiles: Future[immutable.Seq[(String, IOResult)]] = Ftp .ls("/", ftpSettings) //: FtpFile (1) .filter(ftpFile => ftpFile.isFile) //: FtpFile (2) .mapAsyncUnordered(parallelism = 5) { ftpFile => // (3) val localPath = targetDir.resolve("." + ftpFile.path) val fetchFile: Future[IOResult] = Ftp .fromPath(ftpFile.path, ftpSettings) .runWith(FileIO.toPath(localPath)) // (4) fetchFile.map { ioResult => // (5) (ftpFile.path, ioResult) } } //: (String, IOResult) .runWith(Sink.seq) // (6)
All Alpakka samples
Show Alpakka samples listing.