Avro Parquet

The Avro Parquet connector provides an Akka Stream Source, Sink and Flow for push and pull data to and from parquet files.

For more information about Apache Parquet please visit the official documentation.

Project Info: Alpakka Avro Parquet
JDK versions
OpenJDK 8
Scala versions2.12.7, 2.11.12, 2.13.0
JPMS module nameakka.stream.alpakka.avroparquet
Readiness level
Since 1.0-M1, 2018-11-06
Home pagehttps://doc.akka.io/docs/alpakka/current
API documentation
Release notesIn the documentation
IssuesGithub issues


libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-avroparquet" % "1.1.2"
dependencies {
  compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-avroparquet_2.12', version: '1.1.2'

The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.

Direct dependencies
com.typesafe.akkaakka-stream_2.122.5.23Apache License, Version 2.0
org.apache.parquetparquet-avro1.10.0The Apache Software License, Version 2.0
org.scala-langscala-library2.12.7BSD 3-Clause
Dependency tree
com.typesafe.akka    akka-stream_2.12    2.5.23    Apache License, Version 2.0
    com.typesafe.akka    akka-actor_2.12    2.5.23    Apache License, Version 2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-java8-compat_2.12    0.8.0    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe.akka    akka-protobuf_2.12    2.5.23    Apache License, Version 2.0
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    com.typesafe    ssl-config-core_2.12    0.3.7    Apache-2.0
        com.typesafe    config    1.3.3    Apache License, Version 2.0
        org.scala-lang.modules    scala-parser-combinators_2.12    1.1.1    BSD 3-clause
            org.scala-lang    scala-library    2.12.7    BSD 3-Clause
        org.scala-lang    scala-library    2.12.7    BSD 3-Clause
    org.reactivestreams    reactive-streams    1.0.2    CC0
    org.scala-lang    scala-library    2.12.7    BSD 3-Clause
org.apache.parquet    parquet-avro    1.10.0    The Apache Software License, Version 2.0
    it.unimi.dsi    fastutil    7.0.13    Apache License, Version 2.0
    org.apache.avro    avro    1.8.2    The Apache Software License, Version 2.0
        com.thoughtworks.paranamer    paranamer    2.7    BSD
        org.apache.commons    commons-compress    1.8.1    The Apache Software License, Version 2.0
        org.codehaus.jackson    jackson-core-asl    1.9.13    The Apache Software License, Version 2.0
        org.codehaus.jackson    jackson-mapper-asl    1.9.13    The Apache Software License, Version 2.0
            org.codehaus.jackson    jackson-core-asl    1.9.13    The Apache Software License, Version 2.0
        org.slf4j    slf4j-api    1.7.22    MIT License
        org.tukaani    xz    1.5    Public Domain
        org.xerial.snappy    snappy-java    The Apache Software License, Version 2.0
    org.apache.parquet    parquet-column    1.10.0    The Apache Software License, Version 2.0
        commons-codec    commons-codec    1.10    Apache License, Version 2.0
        org.apache.parquet    parquet-common    1.10.0    The Apache Software License, Version 2.0
            org.apache.parquet    parquet-format    2.4.0    The Apache Software License, Version 2.0
                org.slf4j    slf4j-api    1.7.22    MIT License
            org.slf4j    slf4j-api    1.7.22    MIT License
        org.apache.parquet    parquet-encoding    1.10.0    The Apache Software License, Version 2.0
            commons-codec    commons-codec    1.10    Apache License, Version 2.0
            org.apache.parquet    parquet-common    1.10.0    The Apache Software License, Version 2.0
                org.apache.parquet    parquet-format    2.4.0    The Apache Software License, Version 2.0
                    org.slf4j    slf4j-api    1.7.22    MIT License
                org.slf4j    slf4j-api    1.7.22    MIT License
    org.apache.parquet    parquet-format    2.4.0    The Apache Software License, Version 2.0
        org.slf4j    slf4j-api    1.7.22    MIT License
    org.apache.parquet    parquet-hadoop    1.10.0    The Apache Software License, Version 2.0
        commons-pool    commons-pool    1.6    The Apache Software License, Version 2.0
        org.apache.parquet    parquet-column    1.10.0    The Apache Software License, Version 2.0
            commons-codec    commons-codec    1.10    Apache License, Version 2.0
            org.apache.parquet    parquet-common    1.10.0    The Apache Software License, Version 2.0
                org.apache.parquet    parquet-format    2.4.0    The Apache Software License, Version 2.0
                    org.slf4j    slf4j-api    1.7.22    MIT License
                org.slf4j    slf4j-api    1.7.22    MIT License
            org.apache.parquet    parquet-encoding    1.10.0    The Apache Software License, Version 2.0
                commons-codec    commons-codec    1.10    Apache License, Version 2.0
                org.apache.parquet    parquet-common    1.10.0    The Apache Software License, Version 2.0
                    org.apache.parquet    parquet-format    2.4.0    The Apache Software License, Version 2.0
                        org.slf4j    slf4j-api    1.7.22    MIT License
                    org.slf4j    slf4j-api    1.7.22    MIT License
        org.apache.parquet    parquet-format    2.4.0    The Apache Software License, Version 2.0
            org.slf4j    slf4j-api    1.7.22    MIT License
        org.apache.parquet    parquet-jackson    1.10.0    The Apache Software License, Version 2.0
        org.codehaus.jackson    jackson-core-asl    1.9.13    The Apache Software License, Version 2.0
        org.codehaus.jackson    jackson-mapper-asl    1.9.13    The Apache Software License, Version 2.0
            org.codehaus.jackson    jackson-core-asl    1.9.13    The Apache Software License, Version 2.0
        org.xerial.snappy    snappy-java    The Apache Software License, Version 2.0
org.scala-lang    scala-library    2.12.7    BSD 3-Clause

Source Initiation

We will need an ActorSystem and an ActorMaterializer.

implicit val system: ActorSystem = ActorSystem()
implicit val mat: ActorMaterializer = ActorMaterializer()
ActorSystem system = ActorSystem.create();

Sometimes it might be useful to use parquet file as stream Source. For this we will need to create AvroParquetReader instance which produces Parquet GenericRecord instances.

import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.hadoop.ParquetReader
import org.apache.parquet.avro.AvroReadSupport
val file = folder + "/test.parquet"

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)
val reader: ParquetReader[GenericRecord] =
  AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import akka.stream.javadsl.Source;
import org.apache.parquet.avro.AvroParquetReader;
Configuration conf = new Configuration();

ParquetReader<GenericRecord> reader =
            HadoopInputFile.fromPath(new Path("./test.parquet"), conf))

After it, you can create your Source object which accepts instance of AvroParquetReader as parameter

val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
Source<GenericRecord, NotUsed> source = AvroParquetSource.create(reader);

Sink Initiation

Sometimes it might be useful to use Parquet file as akka stream Sink. For an instance, if you need to store data on Parquet files on HDFS (or any other distributed file system) and perform map-reduce jobs on it further. For this we first of all need to create AvroParquetWriter instance which accepts GenericRecord.

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.hadoop.ParquetWriter
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.avro.{AvroParquetWriter, AvroReadSupport}
val file = folder + "/test.parquet"

val conf = new Configuration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

val writer: ParquetWriter[GenericRecord] =
  AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build()
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.util.HadoopInputFile;
Configuration conf = new Configuration();
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
ParquetWriter<GenericRecord> writer =
    AvroParquetWriter.<GenericRecord>builder(new Path(file))

After it, you can create Sink which accepts instance of AvroParquetWriter as parameter.

val sink: Sink[GenericRecord, Future[Done]] = AvroParquetSink(writer)
Sink<GenericRecord, CompletionStage<Done>> sink = AvroParquetSink.create(writer);

Flow Initiation

It might be useful to use ParquetWriter as the streams flow stage, which accepts Parquet GenericRecord, writes it to Parquet file, and returns the same GenericRecords. Such Flow stage can be easily created by creating AvroParquetFlow instance and providing AvroParquetWriter instance as parameter.

val flow: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer)

val result = source
  .map(f => docToRecord(f))
This is all preparation that we are going to need.
ParquetWriter<GenericRecord> writer =
    AvroParquetWriter.<GenericRecord>builder(new Path("./test.parquet"))

Flow<GenericRecord, GenericRecord, NotUsed> flow = AvroParquetFlow.create(writer);

source.via(flow).runWith(Sink.ignore(), materializer);

Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

> avroparquet/test
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.