Sometimes it might be useful to use a Parquet file as stream Source. For this we will need to create an AvroParquetReader instance which will produce records as subtypes of GenericRecord, the Avro record’s abstract representation.
sourceimport org.apache.hadoop.conf.Configurationimport org.apache.parquet.avro.AvroReadSupport
val conf:Configuration=newConfiguration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,true)
val reader:ParquetReader[GenericRecord]=AvroParquetReader.builder[GenericRecord](HadoopInputFile.fromPath(newPath(file), conf)).withConf(conf).build()
After that, you can create the Parquet Source from the initialisation of AvroParquetReader. This object requires an instance of a org.apache.parquet.hadoop.ParquetReader typed by a subtype of GenericRecord.
On the other hand, you can use AvroParquetWriter as the Akka Streams Sink implementation for writing to Parquet. In that case, its initialisation would require an instance of org.apache.parquet.hadoop.ParquetWriter. It will also expect any subtype of GenericRecord to be passed.
sourceimport com.sksamuel.avro4s.Recordimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.Pathimport org.apache.parquet.avro.AvroReadSupport
val file:String="./sample/path/test.parquet"
val conf:Configuration=newConfiguration()
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY,true)
val writer:ParquetWriter[Record]=AvroParquetWriter.builder[Record](HadoopOutputFile.fromPath(newPath(file), conf)).withConf(conf).withSchema(schema).build()
After that, the AvroParquet Sink can already be used.
The below Scala example demonstrates that any subtype of GenericRecord can be passed to the stream. In this case the one used is com.sksamuel.avro4s.Record, which it implements the GenericRecord Avro interface. See Avro4s or Avrohugger for other ways of generating these classes.
sourceval records:List[Record]= documents.map(RecordFormat[Document].to(_))
val source:Source[Record,NotUsed]=Source(records)
val result:Future[Done]= source
.runWith(AvroParquetSink(writer))
The representation of a ParquetWriter as a Flow is also available to use as a streams flow stage, in which as well as the other representations, it will expect subtypes of the Parquet GenericRecord type to be passed. As a result, it writes into a Parquet file and returns the same GenericRecords. Such a Flow stage can be easily created by using the AvroParquetFlow and providing an AvroParquetWriter instance as a parameter.
sourceval records:List[GenericRecord]
val source:Source[GenericRecord,NotUsed]=Source(records)
val avroParquet:Flow[GenericRecord,GenericRecord,NotUsed]=AvroParquetFlow(writer)
val result =
source
.via(avroParquet).runWith(Sink.seq)
This is all the preparation that we are going to need.