File

The File connectors provide additional connectors for filesystems complementing the sources and sinks for files already included in core Akka Streams (which can be found in akka.stream.javadsl.FileIOakka.stream.scaladsl.FileIO).

Project Info: Alpakka File
Artifact
com.lightbend.akka
akka-stream-alpakka-file
1.0.2
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12, 2.13.0-M5
JPMS module nameakka.stream.alpakka.file
License
Readiness level
Since 0.1, 2016-11-16
Home pagehttps://doc.akka.io/docs/alpakka/current/
API documentation
Forums
Release notesIn the documentation
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-file" % "1.0.2"
Maven
<dependency>
  <groupId>com.lightbend.akka</groupId>
  <artifactId>akka-stream-alpakka-file_2.12</artifactId>
  <version>1.0.2</version>
</dependency>
Gradle
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-file_2.12', version: '1.0.2'
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersionLicense
com.typesafe.akkaakka-stream_2.122.5.22Apache License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.22    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.22    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.22    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Writing to and reading from files

Use the FileIO class to create streams reading from or writing to files. It is part part of Akka streams.

Akka Streaming File IO documentation

Tailing a file into a stream

The FileTailSource starts at a given offset in a file and emits chunks of bytes until reaching the end of the file, it will then poll the file for changes and emit new changes as they are written to the file (unless there is backpressure).

A very common use case is combining reading bytes with parsing the bytes into lines, therefore FileTailSource contains a few factory methods to create a source that parses the bytes into lines and emits those.

In this sample we simply tail the lines of a file and print them to standard out:

Scala
import akka.stream.alpakka.file.scaladsl.FileTailSource

val fs = FileSystems.getDefault
val lines: Source[String, NotUsed] = FileTailSource.lines(
  path = fs.getPath(path),
  maxLineSize = 8192,
  pollingInterval = 250.millis
)

lines.runForeach(line => System.out.println(line))
Java
final FileSystem fs = FileSystems.getDefault();
final FiniteDuration pollingInterval = FiniteDuration.create(250, TimeUnit.MILLISECONDS);
final int maxLineSize = 8192;

final Source<String, NotUsed> lines =
    akka.stream.alpakka.file.javadsl.FileTailSource.createLines(
        fs.getPath(path), maxLineSize, pollingInterval);

lines.runForeach((line) -> System.out.println(line), materializer);

Listing directory contents

Directory.ls(path) lists all files and directories directly in a given directory:

Scala
import akka.stream.alpakka.file.scaladsl.Directory

val source: Source[Path, NotUsed] = Directory.ls(dir)
Java
import akka.stream.alpakka.file.javadsl.Directory;

final Source<Path, NotUsed> source = Directory.ls(dir);

Directory.walk(path) traverses all subdirectories and lists files and directories depth first:

Scala
import akka.stream.alpakka.file.scaladsl.Directory
import java.nio.file.FileVisitOption

val files: Source[Path, NotUsed] = Directory.walk(root)

val files2: Source[Path, NotUsed] = Directory.walk(root, maxDepth = Some(1), List(FileVisitOption.FOLLOW_LINKS))
Java
import akka.stream.alpakka.file.javadsl.Directory;
import java.nio.file.FileVisitOption;

final Source<Path, NotUsed> source = Directory.walk(root);

final Source<Path, NotUsed> source = Directory.walk(root, 1, FileVisitOption.FOLLOW_LINKS);

Listening to changes in a directory

The DirectoryChangesSource will emit elements every time there is a change to a watched directory in the local filesystem, the emitted change concists of the path that was changed and an enumeration describing what kind of change it was.

In this sample we simply print each change to the directory to standard output:

Scala
import akka.stream.alpakka.file.scaladsl.DirectoryChangesSource

val fs = FileSystems.getDefault
val changes = DirectoryChangesSource(fs.getPath(path), pollInterval = 1.second, maxBufferSize = 1000)
changes.runForeach {
  case (path, change) => println("Path: " + path + ", Change: " + change)
}
Java
import akka.stream.alpakka.file.javadsl.DirectoryChangesSource;

final FileSystem fs = FileSystems.getDefault();
final Duration pollingInterval = Duration.ofSeconds(1);
final int maxBufferSize = 1000;
final Source<Pair<Path, DirectoryChange>, NotUsed> changes =
    DirectoryChangesSource.create(fs.getPath(path), pollingInterval, maxBufferSize);

changes.runForeach(
    (Pair<Path, DirectoryChange> pair) -> {
      final Path changedPath = pair.first();
      final DirectoryChange change = pair.second();
      System.out.println("Path: " + changedPath + ", Change: " + change);
    },
    materializer);

Rotating the file to stream into

The LogRotatatorSink LogRotatatorSink will create and write to multiple files.
This sink takes a creator as parameter which returns a Bytestring => Option[Path] functionFunction<ByteString, Optional<Path>>. If the generated function returns a path the sink will rotate the file output to this new path and the actual ByteString will be written to this new file too. With this approach the user can define a custom stateful file generation implementation.

This example usage shows the built-in target file creation and a custom sink factory which is required to use CompressionCompression for the target files.

Scala
val triggerFunctionCreator: () => ByteString => Option[Path] = ???

val completion = Source(immutable.Seq("test1", "test2", "test3", "test4", "test5", "test6"))
  .map(ByteString(_))
  .runWith(LogRotatorSink(triggerFunctionCreator))

// GZip compressing the data written
val completion =
  source
    .runWith(
      LogRotatorSink.withSinkFactory(
        triggerFunctionCreator,
        (path: Path) =>
          Flow[ByteString]
            .via(Compression.gzip)
            .toMat(FileIO.toPath(path))(Keep.right)
      )
    )
Java
import akka.stream.alpakka.file.javadsl.LogRotatorSink;

Creator<Function<ByteString, Optional<Path>>> triggerFunctionCreator = ...;

CompletionStage<Done> completion =
    Source.from(Arrays.asList("test1", "test2", "test3", "test4", "test5", "test6"))
        .map(ByteString::fromString)
        .runWith(LogRotatorSink.createFromFunction(triggerFunctionCreator), materializer);

// GZip compressing the data written
CompletionStage<Done> compressedCompletion =
    source.runWith(
        LogRotatorSink.withSinkFactory(
            triggerFunctionCreator,
            path ->
                Flow.of(ByteString.class)
                    .via(Compression.gzip())
                    .toMat(FileIO.toPath(path), Keep.right())),
        materializer);

Example: size-based rotation

Scala
import akka.stream.alpakka.file.scaladsl.LogRotatorSink

val fileSizeTriggerCreator: () => ByteString => Option[Path] = () => {
  val max = 10 * 1024 * 1024
  var size: Long = max
  element: ByteString =>
    if (size + element.size > max) {
      val path = Files.createTempFile("out-", ".log")
      size = element.size
      Some(path)
    } else {
      size += element.size
      None
    }
}

val sizeRotatorSink: Sink[ByteString, Future[Done]] =
  LogRotatorSink(fileSizeTriggerCreator)
Java
Creator<Function<ByteString, Optional<Path>>> sizeBasedTriggerCreator =
    () -> {
      long max = 10 * 1024 * 1024;
      final long[] size = new long[] {max};
      return (element) -> {
        if (size[0] + element.size() > max) {
          Path path = Files.createTempFile("out-", ".log");
          size[0] = element.size();
          return Optional.of(path);
        } else {
          size[0] += element.size();
          return Optional.empty();
        }
      };
    };

Sink<ByteString, CompletionStage<Done>> sizeRotatorSink =
    LogRotatorSink.createFromFunction(sizeBasedTriggerCreator);

Example: time-based rotation

Scala
val destinationDir = FileSystems.getDefault.getPath("/tmp")
val formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'")

val timeBasedTriggerCreator: () => ByteString => Option[Path] = () => {
  var currentFilename: Option[String] = None
  (_: ByteString) => {
    val newName = LocalDateTime.now().format(formatter)
    if (currentFilename.contains(newName)) {
      None
    } else {
      currentFilename = Some(newName)
      Some(destinationDir.resolve(newName))
    }
  }
}

val timeBasedSink: Sink[ByteString, Future[Done]] =
  LogRotatorSink(timeBasedTriggerCreator)
Java
final Path destinationDir = FileSystems.getDefault().getPath("/tmp");
final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("'stream-'yyyy-MM-dd_HH'.log'");

Creator<Function<ByteString, Optional<Path>>> timeBasedTriggerCreator =
    () -> {
      final String[] currentFileName = new String[] {null};
      return (element) -> {
        String newName = LocalDateTime.now().format(formatter);
        if (newName.equals(currentFileName[0])) {
          return Optional.empty();
        } else {
          currentFileName[0] = newName;
          return Optional.of(destinationDir.resolve(newName));
        }
      };
    };

Sink<ByteString, CompletionStage<Done>> timeBasedSink =
    LogRotatorSink.createFromFunction(timeBasedTriggerCreator);

Example: content-based rotation with compression to SFTP file

This example can be found in the self-contained example documentation section.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.