FTP

Artifacts

sbt
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.0.2",
  "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "1.0.2"
)
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-file_2.12</artifactId>
  <version>1.0.2</version>
</dependency>
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-ftp_2.12</artifactId>
  <version>1.0.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '1.0.2',
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-ftp_2.12', version: '1.0.2'
}

Example: Copy all files from an FTP server to local files

  • list FTP server contents (1),
  • just bother about file entries (2),
  • for each file prepare for awaiting FutureCompletionStage results ignoring the stream order (3),
  • run a new stream copying the file contents to a local file (4),
  • combine the filename and the copying result (5),
  • collect all filenames with results into a sequence (6)
Scala
import java.net.InetAddress
import java.nio.file.Paths

import akka.stream.IOResult
import akka.stream.alpakka.ftp.FtpSettings
import akka.stream.alpakka.ftp.scaladsl.Ftp
import akka.stream.scaladsl.{FileIO, Sink}
import org.apache.mina.util.AvailablePortFinder
import playground.filesystem.FileSystemMock
import playground.{ActorSystemAvailable, FtpServerEmbedded}

import scala.collection.immutable
import scala.concurrent.Future
import scala.util.{Failure, Success}

val ftpSettings = FtpSettings(InetAddress.getByName("localhost")).withPort(port)

  Ftp
    .ls("/", ftpSettings)                                    //: FtpFile (1)
    .filter(ftpFile => ftpFile.isFile)                       //: FtpFile (2)
    .mapAsyncUnordered(parallelism = 5) { ftpFile =>         // (3)
      val localPath = targetDir.resolve("." + ftpFile.path)
      val fetchFile: Future[IOResult] = Ftp
        .fromPath(ftpFile.path, ftpSettings)                
        .runWith(FileIO.toPath(localPath))                   // (4)
      fetchFile.map { ioResult =>                            // (5)
        (ftpFile.path, ioResult)
      }
    }                                                        //: (String, IOResult)
    .runWith(Sink.seq)                                       // (6)
Java
import akka.japi.Pair;
import akka.stream.IOResult;
import akka.stream.alpakka.ftp.FtpSettings;
import akka.stream.alpakka.ftp.javadsl.Ftp;
import akka.stream.javadsl.FileIO;
import akka.stream.javadsl.Sink;
import org.apache.ftpserver.FtpServer;
import org.apache.mina.util.AvailablePortFinder;
import playground.ActorSystemAvailable;
import playground.FtpServerEmbedded;
import playground.filesystem.FileSystemMock;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

final FtpSettings ftpSettings =
    FtpSettings.create(InetAddress.getByName("localhost")).withPort(port);
final Integer parallelism = 5;

    Ftp.ls("/", ftpSettings) // : FtpFile (1)
        .filter(ftpFile -> ftpFile.isFile()) // : FtpFile (2)
        .mapAsyncUnordered(
            parallelism,
            ftpFile -> { // (3)
              final Path localPath = targetDir.resolve("." + ftpFile.path());
              final CompletionStage<IOResult> fetchFile =
                  Ftp.fromPath(ftpFile.path(), ftpSettings)
                      .runWith(FileIO.toPath(localPath), actorMaterializer()); // (4)
              return fetchFile.thenApply(
                  ioResult -> // (5)
                  Pair.create(ftpFile.path(), ioResult));
            }) // : (String, IOResult)
        .runWith(Sink.seq(), actorMaterializer()); // (6)

Example: Rotate data stream over to multiple compressed files on SFTP server

  • generate data stream with changing contents over time (1),
  • function that tracks last element and outputs a new path when contents in the stream change (2),
  • prepare SFTP credentials and settings (3),
  • compress ByteStrings (4)
Scala
val data = ('a' to 'd') // (1)
  .flatMap(letter => Seq.fill(10)(ByteString(letter.toString * 10000)))

// (2)
val rotator = () => {
  var last: Char = ' '
  (bs: ByteString) => {
    bs.head.toChar match {
      case char if char != last =>
        last = char
        Some(s"log-$char.z")
      case _ => None
    }
  }
}

// (3)
val identity = SftpIdentity.createFileSftpIdentity(pathToIdentityFile, privateKeyPassphrase)
val credentials = FtpCredentials.create(username, password)
val settings = SftpSettings(InetAddress.getByName(hostname))
  .withPort(port)
  .withSftpIdentity(identity)
  .withStrictHostKeyChecking(false)
  .withCredentials(credentials)

val sink = (path: String) =>
  Flow[ByteString]
    .via(Compression.gzip) // (4)
    .toMat(Sftp.toPath(s"tmp/$path", settings))(Keep.right)

val completion = Source(data).runWith(LogRotatorSink.withSinkFactory(rotator, sink))

Running the example code

This example is contained in a stand-alone runnable main, it can be run from sbt like this:

Scala
sbt
> doc-examples/runMain ftpsamples.FtpToFile
> doc-examples/runMain ftpsamples.RotateLogsToFtp
Java
sbt
> doc-examples/runMain ftpsamples.FtpToFileExample
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.