Avro Parquet

The Avro Parquet connector provides an Akka Stream Source, Sink and Flow for push and pull data to and from Parquet files.

For more information about Apache Parquet please visit the official documentation.

[+] Show project info
Project Info: Alpakka Avro Parquet
Artifact
com.lightbend.akka
akka-stream-alpakka-avroparquet
9.0.0
JDK versions
Eclipse Temurin JDK 11
Eclipse Temurin JDK 17
Scala versions2.13.12, 3.3.3
JPMS module nameakka.stream.alpakka.avroparquet
License
Readiness level
Since 1.0-M1, 2018-11-06
Home pagehttps://doc.akka.io/libraries/alpakka/current
API documentation
Forums
Release notesGitHub releases
IssuesGithub issues
Sourceshttps://github.com/akka/alpakka

Artifacts

The Akka dependencies are available from Akka’s library repository. To access them there, you need to configure the URL for this repository.

sbt
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
Maven
<project>
  ...
  <repositories>
    <repository>
      <id>akka-repository</id>
      <name>Akka library repository</name>
      <url>https://repo.akka.io/maven</url>
    </repository>
  </repositories>
</project>
Gradle
repositories {
    mavenCentral()
    maven {
        url "https://repo.akka.io/maven"
    }
}

Additionally, add the dependencies as below.

sbt
val AkkaVersion = "2.10.0"
libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-avroparquet" % "9.0.0",
  "com.typesafe.akka" %% "akka-stream" % AkkaVersion
)
Maven
<properties>
  <akka.version>2.10.0</akka.version>
  <scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencies>
  <dependency>
    <groupId>com.lightbend.akka</groupId>
    <artifactId>akka-stream-alpakka-avroparquet_${scala.binary.version}</artifactId>
    <version>9.0.0</version>
  </dependency>
  <dependency>
    <groupId>com.typesafe.akka</groupId>
    <artifactId>akka-stream_${scala.binary.version}</artifactId>
    <version>${akka.version}</version>
  </dependency>
</dependencies>
Gradle
def versions = [
  AkkaVersion: "2.10.0",
  ScalaBinary: "2.13"
]
dependencies {
  implementation "com.lightbend.akka:akka-stream-alpakka-avroparquet_${versions.ScalaBinary}:9.0.0"
  implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
}

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
OrganizationArtifactVersion
com.typesafe.akkaakka-stream_2.132.10.0
org.apache.avroavro1.12.0
org.apache.parquetparquet-avro1.14.2
org.scala-langscala-library2.13.12
Dependency tree
com.typesafe.akka    akka-stream_2.13    2.10.0    BUSL-1.1
    com.typesafe.akka    akka-actor_2.13    2.10.0    BUSL-1.1
        com.typesafe    config    1.4.3    Apache-2.0
        org.scala-lang    scala-library    2.13.12    Apache-2.0
    com.typesafe.akka    akka-protobuf-v3_2.13    2.10.0    BUSL-1.1
    org.reactivestreams    reactive-streams    1.0.4    MIT-0
    org.scala-lang    scala-library    2.13.12    Apache-2.0
org.apache.avro    avro    1.12.0
    com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
    com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
    org.apache.commons    commons-compress    1.26.2
        commons-codec    commons-codec    1.17.0
        commons-io    commons-io    2.16.1
        org.apache.commons    commons-lang3    3.14.0
    org.slf4j    slf4j-api    2.0.13
org.apache.parquet    parquet-avro    1.14.2
    org.apache.avro    avro    1.12.0
        com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
        com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
        org.apache.commons    commons-compress    1.26.2
            commons-codec    commons-codec    1.17.0
            commons-io    commons-io    2.16.1
            org.apache.commons    commons-lang3    3.14.0
        org.slf4j    slf4j-api    2.0.13
    org.apache.parquet    parquet-column    1.14.2
        org.apache.parquet    parquet-common    1.14.2
            org.apache.parquet    parquet-format-structures    1.14.2
                javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
            org.slf4j    slf4j-api    2.0.13
        org.apache.parquet    parquet-encoding    1.14.2
            org.apache.parquet    parquet-common    1.14.2
                org.apache.parquet    parquet-format-structures    1.14.2
                    javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
                org.slf4j    slf4j-api    2.0.13
            org.slf4j    slf4j-api    2.0.13
        org.slf4j    slf4j-api    2.0.13
    org.apache.parquet    parquet-common    1.14.2
        org.apache.parquet    parquet-format-structures    1.14.2
            javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
        org.slf4j    slf4j-api    2.0.13
    org.apache.parquet    parquet-hadoop    1.14.2
        com.fasterxml.jackson.datatype    jackson-datatype-jdk8    2.17.0
            com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
            com.fasterxml.jackson.core    jackson-databind    2.17.2    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-annotations    2.17.2    The Apache Software License, Version 2.0
                com.fasterxml.jackson.core    jackson-core    2.17.2    The Apache Software License, Version 2.0
        com.github.luben    zstd-jni    1.5.6-2    BSD 2-Clause License
        commons-pool    commons-pool    1.6
        io.airlift    aircompressor    0.27    Apache License 2.0
        org.apache.parquet    parquet-column    1.14.2
            org.apache.parquet    parquet-common    1.14.2
                org.apache.parquet    parquet-format-structures    1.14.2
                    javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
                org.slf4j    slf4j-api    2.0.13
            org.apache.parquet    parquet-encoding    1.14.2
                org.apache.parquet    parquet-common    1.14.2
                    org.apache.parquet    parquet-format-structures    1.14.2
                        javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
                    org.slf4j    slf4j-api    2.0.13
                org.slf4j    slf4j-api    2.0.13
            org.slf4j    slf4j-api    2.0.13
        org.apache.parquet    parquet-common    1.14.2
            org.apache.parquet    parquet-format-structures    1.14.2
                javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
            org.slf4j    slf4j-api    2.0.13
        org.apache.parquet    parquet-format-structures    1.14.2
            javax.annotation    javax.annotation-api    1.3.2    CDDL + GPLv2 with classpath exception
        org.apache.parquet    parquet-jackson    1.14.2
        org.slf4j    slf4j-api    2.0.13
        org.xerial.snappy    snappy-java    1.1.10.5    Apache-2.0
    org.slf4j    slf4j-api    2.0.13
org.scala-lang    scala-library    2.13.12    Apache-2.0

Source Initiation

Sometimes it might be useful to use a Parquet file as stream Source. For this we will need to create an AvroParquetReader instance which will produce records as subtypes of GenericRecord, the Avro record’s abstract representation.

Scala
sourceimport org.apache.hadoop.conf.Configuration
import org.apache.parquet.avro.AvroReadSupport

val conf: Configuration = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val reader: ParquetReader[GenericRecord] =
  AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
Java
sourceimport org.apache.parquet.hadoop.ParquetReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import akka.stream.javadsl.Source;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.util.HadoopOutputFile;

Configuration conf = new Configuration();

ParquetReader<GenericRecord> reader =
    AvroParquetReader.<GenericRecord>builder(
            HadoopInputFile.fromPath(new Path("./test.parquet"), conf))
        .disableCompatibility()
        .build();

After that, you can create the Parquet Source from the initialisation of AvroParquetReader. This object requires an instance of a org.apache.parquet.hadoop.ParquetReader typed by a subtype of GenericRecord.

Scala
sourceval source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
Java
sourceSource<GenericRecord, NotUsed> source = AvroParquetSource.create(reader);

Sink Initiation

On the other hand, you can use AvroParquetWriter as the Akka Streams Sink implementation for writing to Parquet. In that case, its initialisation would require an instance of org.apache.parquet.hadoop.ParquetWriter. It will also expect any subtype of GenericRecord to be passed.

Scala
sourceimport com.sksamuel.avro4s.Record
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroReadSupport

val file: String = "./sample/path/test.parquet"
val conf: Configuration = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val writer: ParquetWriter[Record] = AvroParquetWriter
  .builder[Record](HadoopOutputFile.fromPath(new Path(file), conf))
  .withConf(conf)
  .withSchema(schema)
  .build()
Java
sourceimport org.apache.parquet.hadoop.ParquetWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopInputFile;

Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
ParquetWriter<GenericRecord> writer =
    AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(new Path(file), conf))
        .withConf(conf)
        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
        .withSchema(schema)
        .build();

After that, the AvroParquet Sink can already be used.

Scala
sourceval records: List[Record] = documents.map(RecordFormat[Document].to(_))
val source: Source[Record, NotUsed] = Source(records)
val result: Future[Done] = source
  .runWith(AvroParquetSink(writer))
Java
sourceSink<GenericRecord, CompletionStage<Done>> sink = AvroParquetSink.create(writer);

Flow Initiation

The representation of a ParquetWriter as a Flow is also available to use as a streams flow stage, in which as well as the other representations, it will expect subtypes of the Parquet GenericRecord type to be passed. As a result, it writes into a Parquet file and returns the same GenericRecords. Such a Flow stage can be easily created by using the AvroParquetFlow and providing an AvroParquetWriter instance as a parameter.

Scala
sourceval records: List[GenericRecord]
val source: Source[GenericRecord, NotUsed] = Source(records)
val avroParquet: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer)
val result =
  source
    .via(avroParquet)
    .runWith(Sink.seq)
This is all the preparation that we are going to need.
Java
sourceParquetWriter<GenericRecord> writer =
    AvroParquetWriter.<GenericRecord>builder(HadoopOutputFile.fromPath(new Path("./test.parquet"), conf))
        .withConf(conf)
        .withSchema(schema)
        .build();

Flow<GenericRecord, GenericRecord, NotUsed> flow = AvroParquetFlow.create(writer);

source.via(flow).runWith(Sink.ignore(), system);

Running the example code

The code in this guide is part of the runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
sbt
> avroparquet/test
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.