File
The File connectors provide additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in ).
[+] Show project infoProject Info: Alpakka File | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-file
9.0.1
|
JDK versions | Eclipse Temurin JDK 11 Eclipse Temurin JDK 17 |
Scala versions | 2.13.12, 3.3.4 |
JPMS module name | akka.stream.alpakka.file |
License | |
Readiness level |
Since 0.1, 2016-11-16
|
Home page | https://doc.akka.io/libraries/alpakka/current |
API documentation | |
Forums | |
Release notes | GitHub releases |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.
Additionally, add the dependencies as below.
- sbt
val AkkaVersion = "2.10.0" libraryDependencies ++= Seq( "com.lightbend.akka" %% "akka-stream-alpakka-file" % "9.0.1", "com.typesafe.akka" %% "akka-stream" % AkkaVersion )
- Maven
- Gradle
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version com.typesafe.akka akka-stream_2.13 2.10.0 org.scala-lang scala-library 2.13.12 - Dependency tree
Writing to and reading from files
Use the FileIO
class to create streams reading from or writing to files. It is part part of Akka streams.
Akka Streaming File IO documentation
Tailing a file into a stream
The FileTailSource
starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).
A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource
contains a few factory methods to create a source that parses the bytes into lines and emits those.
In this sample we simply tail the lines of a file and print them to standard out:
- Scala
-
source
import akka.stream.alpakka.file.scaladsl.FileTailSource val fs = FileSystems.getDefault val lines: Source[String, NotUsed] = FileTailSource.lines( path = fs.getPath(path), maxLineSize = 8192, pollingInterval = 250.millis ) lines.runForeach(line => System.out.println(line))
- Java
Shutdown stream when file is deleted
The FileTailSource
stream will not shutdown or throw an error when the file it is tailing is deleted from the filesystem. If you would like to shutdown the stream, or throw an error, you can do so by merging in a DirectoryChangesSource
that listens to filesystem events in the directory that contains the file.
In the following example, a DirectoryChangesSource
is used to watch for events in a directory. If a file delete event is observed for the file we are tailing then we shutdown the stream gracefully by using a Flow.recoverWithRetries
to switch to a Source.empty
, which with immediately send an OnComplete
signal and shutdown the stream.
- Scala
-
source
val checkInterval = 1.second val fileCheckSource = DirectoryChangesSource(path.getParent, checkInterval, 8192) .collect { case (p, DirectoryChange.Deletion) if path == p => throw new FileNotFoundException(path.toString) } .recoverWithRetries(1, { case _: FileNotFoundException => Source.empty }) val stream = FileTailSource .lines(path = path, maxLineSize = 8192, pollingInterval = 250.millis) .merge(fileCheckSource, eagerComplete = true) - Java
Since the DirectoryChangesSource
and the FileTailSource
operate asynchronously as separate sources there is the possibility that the stream could be shutdown prematurely. If the file is detected as deleted and the stream is shutdown before the last element is emitted from FileTailSource
, then that data will never be available to downstream user stages.
Shutdown stream after an idle timeout
It may be useful to shutdown the stream when no new data has been added for awhile to a file being tailed by FileTailSource
. In the following example, a Flow.idleTimeout
operator is used to trigger a TimeoutException
that can be recovered with Flow.recoverWithRetries
and a Source.empty
to successfully shutdown the stream.
- Scala
-
source
val stream = FileTailSource .lines(path = path, maxLineSize = 8192, pollingInterval = 250.millis) .idleTimeout(5.seconds) .recoverWithRetries(1, { case _: TimeoutException => Source.empty }) - Java
Creating directories
Directory.mkdirs()
and Directory.mkdirsWithContext()
create directories for Path
elements in the stream. The withContext
-variant allows the user to pass through additional information with every path.
- Scala
-
source
import akka.stream.alpakka.file.scaladsl.Directory val flow: Flow[Path, Path, NotUsed] = Directory.mkdirs() val created: Future[immutable.Seq[Path]] = Source(immutable.Seq(dir.resolve("dirA"), dir.resolve("dirB"))) .via(flow) .runWith(Sink.seq) val flowWithContext: FlowWithContext[Path, SomeContext, Path, SomeContext, NotUsed] = Directory.mkdirsWithContext[SomeContext]()
- Java
Listing directory contents
Directory.ls(path)
lists all files and directories directly in a given directory:
- Scala
-
source
import akka.stream.alpakka.file.scaladsl.Directory val source: Source[Path, NotUsed] = Directory.ls(dir)
- Java
Directory.walk(path)
traverses all subdirectories and lists files and directories depth first:
- Scala
-
source
import akka.stream.alpakka.file.scaladsl.Directory import java.nio.file.FileVisitOption val files: Source[Path, NotUsed] = Directory.walk(root) val files2: Source[Path, NotUsed] = Directory.walk(root, maxDepth = Some(1), List(FileVisitOption.FOLLOW_LINKS))
- Java
Listening to changes in a directory
The DirectoryChangesSource
will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.
In this sample we simply print each change to the directory to standard output:
- Scala
-
source
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource val fs = FileSystems.getDefault val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000) changes.runForeach { case (path, change) => println("Path: " + path + ", Change: " + change) }
- Java
Rotating the file to stream into
The LogRotatorSink
will create and write to multiple files.
This sink takes a creator as parameter which returns a Bytestring => Option[Path]
function . If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString
will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.
This example usage shows the built-in target file creation and a custom sink factory which is required to use Compression
for the target files.
- Scala
-
source
val triggerFunctionCreator: () => ByteString => Option[Path] = ??? val completion = Source(immutable.Seq("test1", "test2", "test3", "test4", "test5", "test6")) .map(ByteString(_)) .runWith(LogRotatorSink(triggerFunctionCreator)) // GZip compressing the data written val completion = source .runWith( LogRotatorSink.withSinkFactory( triggerFunctionCreator, (path: Path) => Flow[ByteString] .via(Compression.gzip) .toMat(FileIO.toPath(path))(Keep.right) ) )
- Java
Example: size-based rotation
- Scala
-
source
import akka.stream.alpakka.file.scaladsl.LogRotatorSink val fileSizeTriggerCreator: () => ByteString => Option[Path] = () => { val max = 10 * 1024 * 1024 var size: Long = max (element: ByteString) => if (size + element.size > max) { val path = Files.createTempFile("out-", ".log") size = element.size Some(path) } else { size += element.size None } } val sizeRotatorSink: Sink[ByteString, Future[Done]] = LogRotatorSink(fileSizeTriggerCreator)
- Java
Example: time-based rotation
- Scala
-
source
val destinationDir = FileSystems.getDefault.getPath("/tmp") val formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'") val timeBasedTriggerCreator: () => ByteString => Option[Path] = () => { var currentFilename: Option[String] = None (_: ByteString) => { val newName = LocalDateTime.now().format(formatter) if (currentFilename.contains(newName)) { None } else { currentFilename = Some(newName) Some(destinationDir.resolve(newName)) } } } val timeBasedSink: Sink[ByteString, Future[Done]] = LogRotatorSink(timeBasedTriggerCreator)
- Java
Example: content-based rotation with compression to SFTP file
This example can be found in the self-contained example documentation section.
ZIP Archive
Writing ZIP Archives
The Archive
contains flow for compressing multiple files into one ZIP file.
Result of flow can be send to sink even before whole ZIP file is created, so size of resulting ZIP archive is not limited to memory size.
This example usage shows compressing files from disk.
- Scala
-
source
val fileStream1: Source[ByteString, Any] = ... val fileStream2: Source[ByteString, Any] = ... val filesStream = Source( List( (ArchiveMetadata("akka_full_color.svg"), fileStream1), (ArchiveMetadata("akka_icon_reverse.svg"), fileStream2) ) ) val result = filesStream .via(Archive.zip()) .runWith(FileIO.toPath(Paths.get("result.zip")))
- Java
Reading ZIP archives
Archive.zipReader()
reads a file in ZIP format, and emitting the metadata entry and a Source
for every file in the stream. It is not needed to emit every file, also multiple files can be emitted in parallel. (Every sub-source will seek into the archive.)
The example below reads the incoming file, and unzip all to the local file system.
- Scala
-
source
val zipFile = // ??? val target: Path = // ??? Archive .zipReader(zipFile) .mapAsyncUnordered(4) { case (metadata, source) => val targetFile = target.resolve(metadata.name) targetFile.toFile.getParentFile.mkdirs() //missing error handler source.runWith(FileIO.toPath(targetFile)) }
- Java
TAR Archive
Writing TAR archives
Archive.tar()
creates a flow for packaging multiple files into one TAR archive.
Result of flow can be send to sink even before whole TAR file is created, so size of resulting TAR archive is not limited to memory size.
This example usage shows packaging directories and files from disk.
- Scala
-
source
val fileStream1: Source[ByteString, Any] = ... val fileStream2: Source[ByteString, Any] = ... val fileSize1: Long = ... val fileSize2: Long = ... val filesStream = Source( List( (TarArchiveMetadata.directory("subdir", lastModification), Source.empty), (TarArchiveMetadata("subdir", "akka_full_color.svg", fileSize1, lastModification), fileStream1), (TarArchiveMetadata("akka_icon_reverse.svg", fileSize2, lastModification), fileStream2) ) ) val result = filesStream .via(Archive.tar()) .runWith(FileIO.toPath(Paths.get("result.tar")))
- Java
To produce a gzipped TAR file see the following example.
- Scala
-
source
val resultGz = filesStream .via(Archive.tar().via(akka.stream.scaladsl.Compression.gzip)) .runWith(FileIO.toPath(Paths.get("result.tar.gz")))
- Java
Reading TAR archives
Archive.tarReader()
reads a stream of ByteString
s as TAR format emitting the metadata entry and a Source
for every file in the stream. It is essential to request all the emitted source’s data, otherwise the stream will not reach the next file entry.
The example below reads the incoming stream, creates directories and stores all files in the local file system.
- Scala
-
source
val bytesSource: Source[ByteString, NotUsed] = // ??? val tar = bytesSource .via(Archive.tarReader()) .mapAsync(1) { case (metadata, source) => val targetFile = target.resolve(metadata.filePath) if (metadata.isDirectory) { Source .single(targetFile) .via(Directory.mkdirs()) .runWith(Sink.ignore) } else { // create the target directory Source .single(targetFile.getParent) .via(Directory.mkdirs()) .runWith(Sink.ignore) .map { _ => // stream the file contents to a local file source.runWith(FileIO.toPath(targetFile)) } } } .runWith(Sink.ignore)
- Java
The test in NestedTarRaderTest
illustrates how the tar reader may be used to extract tar archives from within a tar archive.