FTP

The FTP connector provides Akka Stream sources to connect to FTP, FTPs and SFTP servers. Currently, two kinds of sources are provided:

  • one for browsing or traversing the server recursively and,
  • another for retrieving files as a stream of bytes.
Project Info: Alpakka FTP
Artifact
com.lightbend.akka
akka-stream-alpakka-ftp
2.0.0-RC1
JDK versions
Adopt OpenJDK 8
Adopt OpenJDK 11
Scala versions2.12.10, 2.11.12, 2.13.1
JPMS module nameakka.stream.alpakka.ftp
License
Readiness level
Since 0.3, 2016-12-02
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
val AkkaVersion = "2.5.30"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "2.0.0-RC1",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.5.30</akka.version>
</properties>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-ftp_2.12</artifactId>
  <version>2.0.0-RC1</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-stream_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
Gradle
versions += [
  AkkaVersion: "2.5.30"
]
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-ftp_2.12', version: '2.0.0-RC1',
  compile group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: versions.AkkaVersion
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.hierynomussshj0.27.0
com.typesafe.akkaakka-stream_2.122.5.30
commons-netcommons-net3.6
org.scala-langscala-library2.12.10
Dependency tree
com.hierynomus    sshj    0.27.0
    com.jcraft    jzlib    1.1.3
    net.i2p.crypto    eddsa    0.2.0
    org.bouncycastle    bcpkix-jdk15on    1.60
        org.bouncycastle    bcprov-jdk15on    1.60
    org.bouncycastle    bcprov-jdk15on    1.60
    org.slf4j    slf4j-api    1.7.25
com.typesafe.akka    akka-stream_2.12    2.5.30
    com.typesafe.akka    akka-actor_2.12    2.5.30
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0
    com.typesafe.akka    akka-protobuf_2.12    2.5.30
    com.typesafe    ssl-config-core_2.12    0.3.8
        com.typesafe    config    1.3.3
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.2
    org.reactivestreams    reactive-streams    1.0.2
commons-net    commons-net    3.6
org.scala-lang    scala-library    2.12.10

Configuring the connection settings

In order to establish a connection with the remote server, you need to provide a specialized version of a RemoteFileSettings instance. It’s specialized as it depends on the kind of server you’re connecting to: FTP, FTPs or SFTP.

Scala
val ftpSettings = FtpSettings
  .create(InetAddress.getByName(HOSTNAME))
  .withPort(PORT)
  .withCredentials(CREDENTIALS)
  .withBinary(true)
  .withPassiveMode(true)
  // only useful for debugging
  .withConfigureConnection((ftpClient: FTPClient) => {
    ftpClient.addProtocolCommandListener(new PrintCommandListener(new PrintWriter(System.out), true))
  })
Java
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.javadsl.Source;
import org.apache.commons.net.PrintCommandListener;
import org.apache.commons.net.ftp.FTPClient;
import java.net.InetAddress;

FtpSettings ftpSettings =
    FtpSettings.create(InetAddress.getByName(HOSTNAME))
        .withPort(PORT)
        .withCredentials(CREDENTIALS)
        .withBinary(true)
        .withPassiveMode(true)
        // only useful for debugging
        .withConfigureConnectionConsumer(
            (FTPClient ftpClient) -> {
              ftpClient.addProtocolCommandListener(
                  new PrintCommandListener(new PrintWriter(System.out), true));
            });

The configuration above will create an anonymous connection with a remote FTP server in passive mode. For both FTPs and SFTP servers, you will need to provide the specialized versions of these settings: FtpsSettings or SftpSettings respectively.

The example demonstrates optional use of configureConnection option available on FTP and FTPs clients. Use it to configure any custom parameters the server may require, such as explicit or implicit data transfer encryption.

For non-anonymous connection, please provide an instance of NonAnonFtpCredentials instead.

For connection via a proxy, please provide an instance of java.net.Proxy by using the withProxy method.

For connection using a private key, please provide an instance of SftpIdentity to SftpSettings.

In order to use a custom SSH client for SFTP please provide an instance of SSHClient.

Scala
import akka.stream.alpakka.ftp.scaladsl.{Sftp, SftpApi}
import net.schmizz.sshj.{DefaultConfig, SSHClient}

val sshClient: SSHClient = new SSHClient(new DefaultConfig)
val configuredClient: SftpApi = Sftp(sshClient)
Java

import akka.stream.alpakka.ftp.javadsl.Sftp; import akka.stream.alpakka.ftp.javadsl.SftpApi; import net.schmizz.sshj.DefaultConfig; import net.schmizz.sshj.SSHClient; public class ConfigureCustomSSHClient { public ConfigureCustomSSHClient() { SSHClient sshClient = new SSHClient(new DefaultConfig()); SftpApi sftp = Sftp.create(sshClient); } }

Traversing a remote FTP folder recursively

In order to traverse a remote folder recursively, you need to use the ls method in the FTP API:

Scala
import akka.NotUsed
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.Source

def listFiles(basePath: String, settings: FtpSettings): Source[FtpFile, NotUsed] =
  Ftp.ls(basePath, settings)
Java
import akka.NotUsed;
import akka.stream.Materializer;
import akka.stream.alpakka.ftp.FtpFile;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.Source;

public class FtpTraversingExample {

  public void listFiles(String basePath, FtpSettings settings, Materializer materializer)
      throws Exception {
    Ftp.ls(basePath, settings)
        .runForeach(ftpFile -> System.out.println(ftpFile.toString()), materializer);
  }
}

This source will emit FtpFile elements with no significant materialization.

For both FTPs and SFTP servers, you will need to use the FTPs and SFTP API respectively.

Retrieving files

In order to retrieve a remote file as a stream of bytes, you need to use the fromPath method in the FTP API:

Scala
import akka.stream.IOResult
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.Source
import akka.util.ByteString

import scala.concurrent.Future

def retrieveFromPath(path: String, settings: FtpSettings): Source[ByteString, Future[IOResult]] =
  Ftp.fromPath(path, settings)
Java
import akka.stream.IOResult;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import java.util.concurrent.CompletionStage;

public class FtpRetrievingExample {

  public Source<ByteString, CompletionStage<IOResult>> retrieveFromPath(
      String path, FtpSettings settings) throws Exception {
    return Ftp.fromPath(path, settings);
  }
}

This source will emit ByteString elements and materializes to Future in Scala API and CompletionStage in Java API of IOResult when the stream finishes.

For both FTPs and SFTP servers, you will need to use the FTPs and SFTP API respectively.

Writing files

In order to store a remote file from a stream of bytes, you need to use the toPath method in the FTP API:

Scala
import akka.stream.IOResult
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.util.ByteString
import scala.concurrent.Future

val result: Future[IOResult] = Source
  .single(ByteString("this is the file contents"))
  .runWith(Ftp.toPath("file.txt", ftpSettings))

// Create a gzipped target file
import akka.stream.scaladsl.Compression
val result: Future[IOResult] = Source
  .single(ByteString("this is the file contents" * 50))
  .via(Compression.gzip)
  .runWith(Ftp.toPath("file.txt.gz", ftpSettings))
Java
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.IOResult;
import akka.stream.alpakka.testkit.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.Compression;
import akka.stream.testkit.javadsl.StreamTestKit;
import akka.util.ByteString;
import java.util.concurrent.CompletionStage;

CompletionStage<IOResult> result =
    Source.single(ByteString.fromString("this is the file contents"))
        .runWith(Ftp.toPath("file.txt", ftpSettings), materializer);

// Create a gzipped target file
CompletionStage<IOResult> result =
    Source.single(ByteString.fromString("this is the file contents"))
        .via(Compression.gzip())
        .runWith(Ftp.toPath("file.txt.gz", ftpSettings), materializer);

This sink will consume ByteString elements and materializes to Future in Scala API and CompletionStage in Java API of IOResult when the stream finishes.

For both FTPs and SFTP servers, you will need to use the FTPs and SFTP API respectively.

Removing files

In order to remove a remote file, you need to use the remove method in the FTP API:

Scala
import akka.stream.IOResult
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.Sink

import scala.concurrent.Future

def remove(settings: FtpSettings): Sink[FtpFile, Future[IOResult]] =
  Ftp.remove(settings)
Java
import akka.stream.IOResult;
import akka.stream.alpakka.ftp.FtpFile;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.Sink;
import java.util.concurrent.CompletionStage;

public class FtpRemovingExample {

  public Sink<FtpFile, CompletionStage<IOResult>> remove(FtpSettings settings) throws Exception {
    return Ftp.remove(settings);
  }
}

This sink will consume FtpFile elements and materializes to Future in Scala API and CompletionStage in Java API of IOResult when the stream finishes.

Moving files

In order to move a remote file, you need to use the move method in the FTP API. The move method takes a function to calculate the path to which the file should be moved based on the consumed FtpFile.

Scala
import akka.stream.IOResult
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.Sink

import scala.concurrent.Future

def move(destinationPath: FtpFile => String, settings: FtpSettings): Sink[FtpFile, Future[IOResult]] =
  Ftp.move(destinationPath, settings)
Java
import akka.stream.IOResult;
import akka.stream.alpakka.ftp.FtpFile;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.Sink;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

public class FtpMovingExample {

  public Sink<FtpFile, CompletionStage<IOResult>> move(
      Function<FtpFile, String> destinationPath, FtpSettings settings) throws Exception {
    return Ftp.move(destinationPath, settings);
  }
}

This sink will consume FtpFile elements and materializes to Future in Scala API and CompletionStage in Java API of IOResult when the stream finishes.

Typical use-case for this would be listing files from a ftp location, do some processing and move the files when done. An example of this use case can be found below.

Creating directory

In order to create a directory the user has to specify a parent directory (also known as base path) and directory’s name.

Alpakka provides a materialized API mkdirAsync (based on FutureCompletion Stage) and unmaterialized API mkdir (using Sources) to let the user choose when the action will be executed.

Scala

import akka.NotUsed import akka.stream.scaladsl.Source import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.Done def mkdir(basePath: String, directoryName: String, settings: FtpSettings): Source[Done, NotUsed] = Ftp.mkdir(basePath, directoryName, settings)
Java
import akka.Done;
import akka.NotUsed;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.Source;

public class FtpMkdirExample {
  public Source<Done, NotUsed> mkdir(
      String parentPath, String directoryName, FtpSettings settings) {
    return Ftp.mkdir(parentPath, directoryName, settings);
  }
}

Please note that to include a subdirectory in result of ls the emitTraversedDirectories has to be set to true.

Example: downloading files from an FTP location and move the original files

Scala
import java.nio.file.Files

import akka.NotUsed
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.{FileIO, RunnableGraph}

def processAndMove(sourcePath: String,
                   destinationPath: FtpFile => String,
                   settings: FtpSettings): RunnableGraph[NotUsed] =
  Ftp
    .ls(sourcePath, settings)
    .flatMapConcat(ftpFile => Ftp.fromPath(ftpFile.path, settings).map((_, ftpFile)))
    .alsoTo(FileIO.toPath(Files.createTempFile("downloaded", "tmp")).contramap(_._1))
    .to(Ftp.move(destinationPath, settings).contramap(_._2))
Java
import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.alpakka.ftp.FtpFile;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.RunnableGraph;

import java.nio.file.Files;
import java.util.function.Function;

public class FtpProcessAndMoveExample {

  public RunnableGraph<NotUsed> processAndMove(
      String sourcePath, Function<FtpFile, String> destinationPath, FtpSettings settings)
      throws Exception {
    return Ftp.ls(sourcePath, settings)
        .flatMapConcat(
            ftpFile ->
                Ftp.fromPath(ftpFile.path(), settings).map(data -> new Pair<>(data, ftpFile)))
        .alsoTo(FileIO.toPath(Files.createTempFile("downloaded", "tmp")).contramap(Pair::first))
        .to(Ftp.move(destinationPath, settings).contramap(Pair::second));
  }
}

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to browse the code, edit and run it in sbt.

```
docker-compose up -d ftp sftp
sbt
> ftp/test
```
Warning

When using the SFTP API, take into account that JVM relies on /dev/random for random number generation by default. This might potentially block the process on some operating systems as /dev/random waits for a certain amount of entropy to be generated on the host machine before returning a result. In such case, please consider providing the parameter -Djava.security.egd = file:/dev/./urandom into the execution context. Further information can be found here.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.