FTP

Artifacts

sbt
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.0-M2",
  "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "1.0-M2"
)
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-file_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-ftp_2.12</artifactId>
  <version>1.0-M2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '1.0-M2',
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-ftp_2.12', version: '1.0-M2'
}

Example: Copy all files from an FTP server to local files

  • list FTP server contents (1),
  • just bother about file entries (2),
  • for each file prepare for awaiting FutureCompletionStage results ignoring the stream order (3),
  • run a new stream copying the file contents to a local file (4),
  • combine the filename and the copying result (5),
  • collect all filenames with results into a sequence (6)
Scala
import java.net.InetAddress
import java.nio.file.Paths

import akka.stream.IOResult
import akka.stream.alpakka.ftp.FtpSettings
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.{FileIO, Sink}
import org.apache.mina.util.AvailablePortFinder
import playground.filesystem.FileSystemMock
import playground.{ActorSystemAvailable, FtpServerEmbedded}

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

val ftpSettings = FtpSettings(InetAddress.getByName("localhost")).withPort(port)

  Ftp
    .ls("/", ftpSettings)                                    //: FtpFile (1)
    .filter(ftpFile => ftpFile.isFile)                       //: FtpFile (2)
    .mapAsyncUnordered(parallelism = 5) { ftpFile =>         // (3)
      val localPath = targetDir.resolve("." + ftpFile.path)
      val fetchFile: Future[IOResult] = Ftp
        .fromPath(ftpFile.path, ftpSettings)                
        .runWith(FileIO.toPath(localPath))                   // (4)
      fetchFile.map { ioResult =>                            // (5)
        (ftpFile.path, ioResult)
      }
    }                                                        //: (String, IOResult)
    .runWith(Sink.seq)                                       // (6)
Java
import akka.japi.Pair;
import akka.stream.IOResult;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Sink;
import org.apache.ftpserver.FtpServer;
import org.apache.mina.util.AvailablePortFinder;
import playground.ActorSystemAvailable;
import playground.FtpServerEmbedded;
import playground.filesystem.FileSystemMock;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

final FtpSettings ftpSettings =
    FtpSettings.create(InetAddress.getByName("localhost")).withPort(port);
final Integer parallelism = 5;

    Ftp.ls("/", ftpSettings) // : FtpFile (1)
        .filter(ftpFile -> ftpFile.isFile()) // : FtpFile (2)
        .mapAsyncUnordered(
            parallelism,
            ftpFile -> { // (3)
              final Path localPath = targetDir.resolve("." + ftpFile.path());
              final CompletionStage<IOResult> fetchFile =
                  Ftp.fromPath(ftpFile.path(), ftpSettings)
                      .runWith(FileIO.toPath(localPath), actorMaterializer()); // (4)
              return fetchFile.thenApply(
                  ioResult -> // (5)
                  Pair.create(ftpFile.path(), ioResult));
            }) // : (String, IOResult)
        .runWith(Sink.seq(), actorMaterializer()); // (6)

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/runMain ftpsamples.FtpToFile
Java
sbt
> doc-examples/runMain ftpsamples.FtpToFileExample
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.