Hadoop Distributed File System - HDFS

The connector offers Flows and Sources that interact with HDFS file systems.

For more information about Hadoop, please visit the Hadoop documentation.

Project Info: Alpakka HDFS
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.3
JPMS module nameakka.stream.alpakka.hdfs
Readiness level
Since 0.20, 2018-07-04
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Release notesGitHub releases
IssuesGithub issues


The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

resolvers += "Akka library repository".at("https://repo.akka.io/maven")
      <name>Akka library repository</name>
repositories {
    maven {
        url "https://repo.akka.io/maven"

Additionally, add the dependencies as below.

val AkkaVersion = "2.9.3"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-hdfs" % "8.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
def versions = [
  AkkaVersion: "2.9.3",
  ScalaBinary: "2.13"
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-hdfs_${versions.ScalaBinary}:8.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
Dependency tree
Specifying a Hadoop Version

By default, HDFS connector uses Hadoop 3.3.6. If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the connector dependency and add the dependency for your preferred version.

Set up client

Flows provided by this connector need a prepared org.apache.hadoop.fs.FileSystem to interact with HDFS.

sourceimport org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem

val conf = new Configuration()
conf.set("fs.default.name", "hdfs://localhost:54310")

val fs: FileSystem = FileSystem.get(conf)
sourceConfiguration conf = new Configuration();
conf.set("fs.default.name", "hdfs://localhost:54310");

fs = FileSystem.get(conf);


The connector provides three Flows. Each flow requires RotationStrategy and SyncStrategy to run. HdfsFlow. HdfsFlow.

The flows push OutgoingMessage to a downstream.

Data Writer

Use HdfsFlow.data to stream with FSDataOutputStream without any compression.

sourceval flow = HdfsFlow.data(
  RotationStrategy.size(1, FileUnit.GB),
sourceFlow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
        fs, SyncStrategy.count(500), RotationStrategy.size(1, FileUnit.GB()), settings);

Compressed Data Writer

First, create CompressionCodec.

sourceval codec = new DefaultCodec()
sourceDefaultCodec codec = new DefaultCodec();

Then, use HdfsFlow.compress to stream with CompressionOutputStream and CompressionCodec.

sourceval flow = HdfsFlow.compressed(
  RotationStrategy.size(0.1, FileUnit.MB),
sourceFlow<HdfsWriteMessage<ByteString, NotUsed>, RotationMessage, NotUsed> flow =
        fs, SyncStrategy.count(50), RotationStrategy.size(0.1, FileUnit.MB()), codec, settings);

Sequence Writer

Use HdfsFlow.sequence to stream a flat file consisting of binary key/value pairs.

Without Compression

sourceval flow = HdfsFlow.sequence(
  RotationStrategy.size(1, FileUnit.MB),
sourceFlow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
        RotationStrategy.size(1, FileUnit.MB()),

With Compression

First, define a codec.

sourceval codec = new DefaultCodec()
sourceDefaultCodec codec = new DefaultCodec();

Then, create a flow.

sourceval flow = HdfsFlow.sequence(
  RotationStrategy.size(1, FileUnit.MB),
sourceFlow<HdfsWriteMessage<Pair<Text, Text>, NotUsed>, RotationMessage, NotUsed> flow =
        RotationStrategy.size(1, FileUnit.MB()),

Passing data through HdfsFlow

Use HdfsFlow.dataWithPassThrough, HdfsFlow.compressedWithPassThrough or HdfsFlow.sequenceWithPassThrough.

When streaming documents from Kafka, you might want to commit to Kafka. The flow will emit two messages. For every input, it will produce WrittenMessage and when it rotates, RotationMessage.

Let’s say that we have these classes.

sourcecase class Book(title: String)
case class KafkaOffset(offset: Int)
case class KafkaMessage(book: Book, offset: KafkaOffset)
sourcepublic static class Book {
  final String title;

  Book(String title) {
    this.title = title;

static class KafkaCommitter {
  List<Integer> committedOffsets = new ArrayList<>();

  void commit(KafkaOffset offset) {

static class KafkaOffset {
  final int offset;

  KafkaOffset(int offset) {
    this.offset = offset;

static class KafkaMessage {
  final Book book;
  final KafkaOffset offset;

  KafkaMessage(Book book, KafkaOffset offset) {
    this.book = book;
    this.offset = offset;

Then, we can stream with passThrough.

source// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
val messagesFromKafka = List(
  KafkaMessage(Book("Akka Concurrency"), KafkaOffset(0)),
  KafkaMessage(Book("Akka in Action"), KafkaOffset(1)),
  KafkaMessage(Book("Effective Akka"), KafkaOffset(2)),
  KafkaMessage(Book("Learning Scala"), KafkaOffset(3)),
  KafkaMessage(Book("Scala Puzzlers"), KafkaOffset(4)),
  KafkaMessage(Book("Scala for Spark in Production"), KafkaOffset(5))

var committedOffsets = List[KafkaOffset]()

def commitToKafka(offset: KafkaOffset): Unit =
  committedOffsets = committedOffsets :+ offset

val resF = Source(messagesFromKafka)
  .map { (kafkaMessage: KafkaMessage) =>
    val book = kafkaMessage.book
    // Transform message so that we can write to hdfs
    HdfsWriteMessage(ByteString(book.title), kafkaMessage.offset)
  .map { message =>
    message match {
      case WrittenMessage(passThrough, _) =>
      case _ => ()
  .collect {
    case rm: RotationMessage => rm
source// We're going to pretend we got messages from kafka.
// After we've written them to HDFS, we want
// to commit the offset to Kafka
List<KafkaMessage> messagesFromKafka =
        new KafkaMessage(new Book("Akka Concurrency"), new KafkaOffset(0)),
        new KafkaMessage(new Book("Akka in Action"), new KafkaOffset(1)),
        new KafkaMessage(new Book("Effective Akka"), new KafkaOffset(2)),
        new KafkaMessage(new Book("Learning Scala"), new KafkaOffset(3)),
        new KafkaMessage(new Book("Scala Puzzlers"), new KafkaOffset(4)),
        new KafkaMessage(new Book("Scala for Spark in Production"), new KafkaOffset(5)));

final KafkaCommitter kafkaCommitter = new KafkaCommitter();

Flow<HdfsWriteMessage<ByteString, KafkaOffset>, OutgoingMessage<KafkaOffset>, NotUsed> flow =

CompletionStage<List<RotationMessage>> resF =
            kafkaMessage -> {
              Book book = kafkaMessage.book;
              // Transform message so that we can write to hdfs\
              return HdfsWriteMessage.create(
                  ByteString.fromString(book.title), kafkaMessage.offset);
            message -> {
              if (message instanceof WrittenMessage) {
                kafkaCommitter.commit(((WrittenMessage<KafkaOffset>) message).passThrough());
                return message;
              } else {
                return message;
        .collectType(RotationMessage.class) // Collect only rotation messages
        .runWith(Sink.seq(), system);


We can configure the sink by HdfsWritingSettings.

sourceval settings =

File path generator

FilePathGenerator provides a functionality to generate rotation path in HDFS.

sourceval pathGenerator =
    (rotationCount: Long, timestamp: Long) => s"/tmp/alpakka/$rotationCount-$timestamp"
sourceBiFunction<Long, Long, String> func =
    (rotationCount, timestamp) -> "/tmp/alpakka/" + rotationCount + "-" + timestamp;
FilePathGenerator pathGenerator = FilePathGenerator.create(func);

Rotation Strategy

RotationStrategy provides a functionality to decide when to rotate files.

Sync Strategy

SyncStrategy provides a functionality to decide when to synchronize the output.


Use HdfsSource to read from HDFS. HdfsSource. HdfsSource.

Data Reader

sourceval source = HdfsSource.data(fs, path)
sourceSource<ByteString, CompletionStage<IOResult>> source = HdfsSource.data(fs, path);

Compressed Data Reader

sourceval source = HdfsSource.compressed(fs, path, codec)
sourceSource<ByteString, CompletionStage<IOResult>> source = HdfsSource.compressed(fs, path, codec);

Sequence Reader

sourceval source = HdfsSource.sequence(fs, path, classOf[Text], classOf[Text])
sourceSource<Pair<Text, Text>, NotUsed> source =
    HdfsSource.sequence(fs, path, Text.class, Text.class);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

> hdfs/testOnly *.HdfsWriterSpec
> hdfs/testOnly *.HdfsReaderSpec
> hdfs/testOnly *.HdfsWriterTest
> hdfs/testOnly *.HdfsReaderTest
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.