Avro Parquet
The Avro Parquet connector provides an Akka Stream Source, Sink and Flow for push and pull data to and from parquet files.
For more information about Apache Parquet please visit the official documentation.
Project Info: Alpakka Avro Parquet | |
---|---|
Artifact | com.lightbend.akka
akka-stream-alpakka-avroparquet
1.0.2
|
JDK versions | OpenJDK 8 |
Scala versions | 2.12.7, 2.11.12, 2.13.0-M5 |
JPMS module name | akka.stream.alpakka.avroparquet |
License | |
Readiness level |
Since 1.0-M1, 2018-11-06
|
Home page | https://doc.akka.io/docs/alpakka/current/ |
API documentation | |
Forums | |
Release notes | In the documentation |
Issues | Github issues |
Sources | https://github.com/akka/alpakka |
Artifacts
- sbt
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-avroparquet" % "1.0.2"
- Maven
<dependency> <groupId>com.lightbend.akka</groupId> <artifactId>akka-stream-alpakka-avroparquet_2.12</artifactId> <version>1.0.2</version> </dependency>
- Gradle
dependencies { compile group: 'com.lightbend.akka', name: 'akka-stream-alpakka-avroparquet_2.12', version: '1.0.2' }
The table below shows direct dependencies of this module and the second tab shows all libraries it depends on transitively.
- Direct dependencies
Organization Artifact Version License com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 org.apache.parquet parquet-avro 1.10.0 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause - Dependency tree
com.typesafe.akka akka-stream_2.12 2.5.22 Apache License, Version 2.0 com.typesafe.akka akka-actor_2.12 2.5.22 Apache License, Version 2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-java8-compat_2.12 0.8.0 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe.akka akka-protobuf_2.12 2.5.22 Apache License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause com.typesafe ssl-config-core_2.12 0.3.7 Apache-2.0 com.typesafe config 1.3.3 Apache License, Version 2.0 org.scala-lang.modules scala-parser-combinators_2.12 1.1.1 BSD 3-clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.scala-lang scala-library 2.12.7 BSD 3-Clause org.reactivestreams reactive-streams 1.0.2 CC0 org.scala-lang scala-library 2.12.7 BSD 3-Clause org.apache.parquet parquet-avro 1.10.0 The Apache Software License, Version 2.0 it.unimi.dsi fastutil 7.0.13 Apache License, Version 2.0 org.apache.avro avro 1.8.2 The Apache Software License, Version 2.0 com.thoughtworks.paranamer paranamer 2.7 BSD org.apache.commons commons-compress 1.8.1 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-core-asl 1.9.13 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-mapper-asl 1.9.13 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-core-asl 1.9.13 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.tukaani xz 1.5 Public Domain org.xerial.snappy snappy-java 1.1.2.6 The Apache Software License, Version 2.0 org.apache.parquet parquet-column 1.10.0 The Apache Software License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 org.apache.parquet parquet-common 1.10.0 The Apache Software License, Version 2.0 org.apache.parquet parquet-format 2.4.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.slf4j slf4j-api 1.7.22 MIT License org.apache.parquet parquet-encoding 1.10.0 The Apache Software License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 org.apache.parquet parquet-common 1.10.0 The Apache Software License, Version 2.0 org.apache.parquet parquet-format 2.4.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.slf4j slf4j-api 1.7.22 MIT License org.apache.parquet parquet-format 2.4.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.apache.parquet parquet-hadoop 1.10.0 The Apache Software License, Version 2.0 commons-pool commons-pool 1.6 The Apache Software License, Version 2.0 org.apache.parquet parquet-column 1.10.0 The Apache Software License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 org.apache.parquet parquet-common 1.10.0 The Apache Software License, Version 2.0 org.apache.parquet parquet-format 2.4.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.slf4j slf4j-api 1.7.22 MIT License org.apache.parquet parquet-encoding 1.10.0 The Apache Software License, Version 2.0 commons-codec commons-codec 1.10 Apache License, Version 2.0 org.apache.parquet parquet-common 1.10.0 The Apache Software License, Version 2.0 org.apache.parquet parquet-format 2.4.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.slf4j slf4j-api 1.7.22 MIT License org.apache.parquet parquet-format 2.4.0 The Apache Software License, Version 2.0 org.slf4j slf4j-api 1.7.22 MIT License org.apache.parquet parquet-jackson 1.10.0 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-core-asl 1.9.13 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-mapper-asl 1.9.13 The Apache Software License, Version 2.0 org.codehaus.jackson jackson-core-asl 1.9.13 The Apache Software License, Version 2.0 org.xerial.snappy snappy-java 1.1.2.6 The Apache Software License, Version 2.0 org.scala-lang scala-library 2.12.7 BSD 3-Clause
Source Initiation
We will need an ActorSystem and an ActorMaterializer.
- Scala
-
implicit val system: ActorSystem = ActorSystem() implicit val mat: ActorMaterializer = ActorMaterializer()
- Java
-
ActorSystem system = ActorSystem.create();
Sometimes it might be useful to use parquet file as stream Source. For this we will need to create AvroParquetReader
instance which produces Parquet GenericRecord
instances.
- Scala
-
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.hadoop.ParquetReader import org.apache.parquet.avro.AvroReadSupport val file = folder + "/test.parquet" val conf = new Configuration() conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true) val reader: ParquetReader[GenericRecord] = AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(new Path(file), conf)).withConf(conf).build()
- Java
-
import org.apache.parquet.hadoop.ParquetReader; import org.apache.avro.generic.GenericRecord; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.hadoop.fs.Path; import org.apache.avro.Schema; import akka.stream.javadsl.Source; import org.apache.parquet.avro.AvroParquetReader; Configuration conf = new Configuration(); ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder( HadoopInputFile.fromPath(new Path("./test.parquet"), conf)) .disableCompatibility() .build();
After it, you can create your Source object which accepts instance of AvroParquetReader
as parameter
- Scala
-
val source: Source[GenericRecord, NotUsed] = AvroParquetSource(reader)
- Java
-
Source<GenericRecord, NotUsed> source = AvroParquetSource.create(reader);
Sink Initiation
Sometimes it might be useful to use Parquet file as akka stream Sink. For an instance, if you need to store data on Parquet files on HDFS (or any other distributed file system) and perform map-reduce jobs on it further. For this we first of all need to create AvroParquetWriter
instance which accepts GenericRecord
.
- Scala
-
import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.ParquetWriter import org.apache.avro.generic.GenericRecord import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.avro.{AvroParquetWriter, AvroReadSupport} val file = folder + "/test.parquet" val conf = new Configuration() conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true) val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](new Path(file)).withConf(conf).withSchema(schema).build()
- Java
-
import org.apache.parquet.hadoop.ParquetWriter; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.util.HadoopInputFile; Configuration conf = new Configuration(); conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true); ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path(file)) .withConf(conf) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withSchema(schema) .build();
After it, you can create Sink which accepts instance of AvroParquetWriter
as parameter.
- Scala
-
val sink: Sink[GenericRecord, Future[Done]] = AvroParquetSink(writer)
- Java
-
Sink<GenericRecord, CompletionStage<Done>> sink = AvroParquetSink.create(writer);
Flow Initiation
It might be useful to use ParquetWriter as the streams flow stage, which accepts Parquet GenericRecord
, writes it to Parquet file, and returns the same GenericRecords
. Such Flow stage can be easily created by creating AvroParquetFlow
instance and providing AvroParquetWriter
instance as parameter.
- Scala
-
This is all preparation that we are going to need.val flow: Flow[GenericRecord, GenericRecord, NotUsed] = AvroParquetFlow(writer) val result = source .map(f => docToRecord(f)) .via(flow) .runWith(Sink.ignore)
- Java
-
ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path("./test.parquet")) .withConf(conf) .withSchema(schema) .build(); Flow<GenericRecord, GenericRecord, NotUsed> flow = AvroParquetFlow.create(writer); source.via(flow).runWith(Sink.ignore(), materializer);
Running the example code
The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.
- Scala
-
sbt > avroparquet/test