FTP
Artifacts
- sbt
libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.0.2", "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "1.0.2" )
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-file_2.12</artifactId> <version>1.0.2</version> </dependency> <dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-ftp_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '1.0.2', compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-ftp_2.12', version: '1.0.2' }
Example: Copy all files from an FTP server to local files
- list FTP server contents (1),
- just bother about file entries (2),
- for each file prepare for awaiting
Future
CompletionStage
results ignoring the stream order (3), - run a new stream copying the file contents to a local file (4),
- combine the filename and the copying result (5),
- collect all filenames with results into a sequence (6)
- Scala
-
import java.net.InetAddress import java.nio.file.Paths import akka.stream.IOResult import akka.stream.alpakka.ftp.FtpSettings import akka.stream.alpakka.ftp.scaladsl.Ftp import akka.stream.scaladsl.{FileIO, Sink} import org.apache.mina.util.AvailablePortFinder import playground.filesystem.FileSystemMock import playground.{ActorSystemAvailable, FtpServerEmbedded} import scala.collection.immutable import scala.concurrent.Future import scala.util.{Failure, Success} val ftpSettings = FtpSettings(InetAddress.getByName("localhost")).withPort(port) Ftp .ls("/", ftpSettings) //: FtpFile (1) .filter(ftpFile => ftpFile.isFile) //: FtpFile (2) .mapAsyncUnordered(parallelism = 5) { ftpFile => // (3) val localPath = targetDir.resolve("." + ftpFile.path) val fetchFile: Future[IOResult] = Ftp .fromPath(ftpFile.path, ftpSettings) .runWith(FileIO.toPath(localPath)) // (4) fetchFile.map { ioResult => // (5) (ftpFile.path, ioResult) } } //: (String, IOResult) .runWith(Sink.seq) // (6)
- Java
-
import akka.japi.Pair; import akka.stream.IOResult; import akka.stream.alpakka.ftp.FtpSettings; import akka.stream.alpakka.ftp.javadsl.Ftp; import akka.stream.javadsl.FileIO; import akka.stream.javadsl.Sink; import org.apache.ftpserver.FtpServer; import org.apache.mina.util.AvailablePortFinder; import playground.ActorSystemAvailable; import playground.FtpServerEmbedded; import playground.filesystem.FileSystemMock; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; final FtpSettings ftpSettings = FtpSettings.create(InetAddress.getByName("localhost")).withPort(port); final Integer parallelism = 5; Ftp.ls("/", ftpSettings) // : FtpFile (1) .filter(ftpFile -> ftpFile.isFile()) // : FtpFile (2) .mapAsyncUnordered( parallelism, ftpFile -> { // (3) final Path localPath = targetDir.resolve("." + ftpFile.path()); final CompletionStage<IOResult> fetchFile = Ftp.fromPath(ftpFile.path(), ftpSettings) .runWith(FileIO.toPath(localPath), actorMaterializer()); // (4) return fetchFile.thenApply( ioResult -> // (5) Pair.create(ftpFile.path(), ioResult)); }) // : (String, IOResult) .runWith(Sink.seq(), actorMaterializer()); // (6)
Example: Rotate data stream over to multiple compressed files on SFTP server
- generate data stream with changing contents over time (1),
- function that tracks last element and outputs a new path when contents in the stream change (2),
- prepare SFTP credentials and settings (3),
- compress ByteStrings (4)
- Scala
-
val data = ('a' to 'd') // (1) .flatMap(letter => Seq.fill(10)(ByteString(letter.toString * 10000))) // (2) val rotator = () => { var last: Char = ' ' (bs: ByteString) => { bs.head.toChar match { case char if char != last => last = char Some(s"log-$char.z") case _ => None } } } // (3) val identity = SftpIdentity.createFileSftpIdentity(pathToIdentityFile, privateKeyPassphrase) val credentials = FtpCredentials.create(username, password) val settings = SftpSettings(InetAddress.getByName(hostname)) .withPort(port) .withSftpIdentity(identity) .withStrictHostKeyChecking(false) .withCredentials(credentials) val sink = (path: String) => Flow[ByteString] .via(Compression.gzip) // (4) .toMat(Sftp.toPath(s"tmp/$path", settings))(Keep.right) val completion = Source(data).runWith(LogRotatorSink.withSinkFactory(rotator, sink))
Running the example code
This example is contained in a stand-alone runnable main, it can be run from sbt
like this:
- Scala
-
sbt > doc-examples/runMain ftpsamples.FtpToFile > doc-examples/runMain ftpsamples.RotateLogsToFtp
- Java
-
sbt > doc-examples/runMain ftpsamples.FtpToFileExample