File
The File connectors provide additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in akka.stream.javadsl.FileIOakka.stream.scaladsl.FileIO).
Project Info: Alpakka File | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-file
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0-M5 |
JPMS module name | akka.stream.alpakka.file |
License | |
Readiness level |
Since 0.1, 2016-11-16
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.0.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-file_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '1.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause
Writing to and reading from files
Use the FileIO
class to create streams reading from or writing to files. It is part part of Akka streams.
Akka Streaming File IO documentation
Tailing a file into a stream
The FileTailSource
starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).
A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource
contains a few factory methods to create a source that parses the bytes into lines and emits those.
In this sample we simply tail the lines of a file and print them to standard out:
- Scala
-
import akka.stream.alpakka.file.scaladsl.FileTailSource val fs = FileSystems.getDefault val lines: Source[String, NotUsed] = FileTailSource.lines( path = fs.getPath(path), maxLineSize = 8192, pollingInterval = 250.millis ) lines.runForeach(line => System.out.println(line))
- Java
-
final FileSystem fs = FileSystems.getDefault(); final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS); final int maxLineSize = 8192; final Source<String, NotUsed> lines = akka.stream.alpakka.file.javadsl.FileTailSource.createLines( fs.getPath(path), maxLineSize, pollingInterval); lines.runForeach((line) -> System.out.println(line), materializer);
Listing directory contents
Directory.ls(path)
lists all files and directories directly in a given directory:
- Scala
-
import akka.stream.alpakka.file.scaladsl.Directory val source: Source[Path, NotUsed] = Directory.ls(dir)
- Java
-
import akka.stream.alpakka.file.javadsl.Directory; final Source<Path, NotUsed> source = Directory.ls(dir);
Directory.walk(path)
traverses all subdirectories and lists files and directories depth first:
- Scala
-
import akka.stream.alpakka.file.scaladsl.Directory import java.nio.file.FileVisitOption val files: Source[Path, NotUsed] = Directory.walk(root) val files2: Source[Path, NotUsed] = Directory.walk(root, maxDepth = Some(1), List(FileVisitOption.FOLLOW_LINKS))
- Java
-
import akka.stream.alpakka.file.javadsl.Directory; import java.nio.file.FileVisitOption; final Source<Path, NotUsed> source = Directory.walk(root); final Source<Path, NotUsed> source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS);
Listening to changes in a directory
The DirectoryChangesSource
will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.
In this sample we simply print each change to the directory to standard output:
- Scala
-
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource val fs = FileSystems.getDefault val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000) changes.runForeach { case (path, change) => println("Path: " + path + ", Change: " + change) }
- Java
-
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource; final FileSystem fs = FileSystems.getDefault(); final Duration pollingInterval = Duration.ofSeconds(1); final int maxBufferSize = 1000; final Source<Pair<Path, DirectoryChange>, NotUsed> changes = DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize); changes.runForeach( (Pair<Path, DirectoryChange> pair) -> { final Path changedPath = pair.first(); final DirectoryChange change = pair.second(); System.out.println("Path: " + changedPath + ", Change: " + change); }, materializer);
Rotating the file to stream into
The LogRotatatorSink LogRotatatorSink will create and write to multiple files.
This sink takes a creator as parameter which returns a Bytestring => Option[Path]
functionFunction<ByteString, Optional<Path>>
. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString
will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.
This example usage shows the built-in target file creation and a custom sink factory which is required to use CompressionCompression for the target files.
- Scala
-
val triggerFunctionCreator: () => ByteString => Option[Path] = ??? val completion = Source(immutable.Seq("test1", "test2", "test3", "test4", "test5", "test6")) .map(ByteString(_)) .runWith(LogRotatorSink(triggerFunctionCreator)) // GZip compressing the data written val completion = source .runWith( LogRotatorSink.withSinkFactory( triggerFunctionCreator, (path: Path) => Flow[ByteString] .via(Compression.gzip) .toMat(FileIO.toPath(path))(Keep.right) ) )
- Java
-
import akka.stream.alpakka.file.javadsl.LogRotatorSink; Creator<Function<ByteString, Optional<Path>>> triggerFunctionCreator = ...; CompletionStage<Done> completion = Source.from(Arrays.asList("test1", "test2", "test3", "test4", "test5", "test6")) .map(ByteString::fromString) .runWith(LogRotatorSink.createFromFunction(triggerFunctionCreator), materializer); // GZip compressing the data written CompletionStage<Done> compressedCompletion = source.runWith( LogRotatorSink.withSinkFactory( triggerFunctionCreator, path -> Flow.of(ByteString.class) .via(Compression.gzip()) .toMat(FileIO.toPath(path), Keep.right())), materializer);
Example: size-based rotation
- Scala
-
import akka.stream.alpakka.file.scaladsl.LogRotatorSink val fileSizeTriggerCreator: () => ByteString => Option[Path] = () => { val max = 10 * 1024 * 1024 var size: Long = max element: ByteString => if (size + element.size > max) { val path = Files.createTempFile("out-", ".log") size = element.size Some(path) } else { size += element.size None } } val sizeRotatorSink: Sink[ByteString, Future[Done]] = LogRotatorSink(fileSizeTriggerCreator)
- Java
-
Creator<Function<ByteString, Optional<Path>>> sizeBasedTriggerCreator = () -> { long max = 10 * 1024 * 1024; final long[] size = new long[] {max}; return (element) -> { if (size[0] + element.size() > max) { Path path = Files.createTempFile("out-", ".log"); size[0] = element.size(); return Optional.of(path); } else { size[0] += element.size(); return Optional.empty(); } }; }; Sink<ByteString, CompletionStage<Done>> sizeRotatorSink = LogRotatorSink.createFromFunction(sizeBasedTriggerCreator);
Example: time-based rotation
- Scala
-
val destinationDir = FileSystems.getDefault.getPath("/tmp") val formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'") val timeBasedTriggerCreator: () => ByteString => Option[Path] = () => { var currentFilename: Option[String] = None (_: ByteString) => { val newName = LocalDateTime.now().format(formatter) if (currentFilename.contains(newName)) { None } else { currentFilename = Some(newName) Some(destinationDir.resolve(newName)) } } } val timeBasedSink: Sink[ByteString, Future[Done]] = LogRotatorSink(timeBasedTriggerCreator)
- Java
-
final Path destinationDir = FileSystems.getDefault().getPath("/tmp"); final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'"); Creator<Function<ByteString, Optional<Path>>> timeBasedTriggerCreator = () -> { final String[] currentFileName = new String[] {null}; return (element) -> { String newName = LocalDateTime.now().format(formatter); if (newName.equals(currentFileName[0])) { return Optional.empty(); } else { currentFileName[0] = newName; return Optional.of(destinationDir.resolve(newName)); } }; }; Sink<ByteString, CompletionStage<Done>> timeBasedSink = LogRotatorSink.createFromFunction(timeBasedTriggerCreator);
Example: content-based rotation with compression to SFTP file
This example can be found in the self-contained example documentation section.