Giter Site home page Giter Site logo

mjakubowski84 / parquet4s Goto Github PK

View Code? Open in Web Editor NEW
272.0 7.0 70.0 2.33 MB

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.

Home Page: https://mjakubowski84.github.io/parquet4s/

License: MIT License

Scala 100.00%
parquet scala hadoop akka-streams akka streams reader writer parquet-files aws

parquet4s's Introduction

Parquet4S

Parquet4s is a simple I/O for Parquet. Allows you to easily read and write Parquet files in Scala.

Use just a Scala case class to define the schema of your data. No need to use Avro, Protobuf, Thrift, or other data serialisation systems. You can use generic records if you don't want to use the case class, too.

Compatible with files generated with Apache Spark. However, unlike in Spark, you do not have to start a cluster to perform I/O operations.

Based on official Parquet library, Hadoop Client and Shapeless (Shapeless is not in use in a version for Scala 3).

As it is based on Hadoop Client, you can connect to any Hadoop-compatible storage like AWS S3 or Google Cloud Storage.

Integrations for Akka Streams, Pekko Streams, and FS2.

Released for Scala 2.12.x, 2.13.x and 3.3.x.

Documentation

Documentation is available at here.

Contributing

Do you want to contribute? Please read the contribution guidelines.

Sponsors

parquet4s's People

Contributors

aravindr-dv avatar bbstilson avatar bwmcadams avatar calvinlfer avatar coolbetm avatar dkwi avatar farico avatar flipp5b avatar hochgi avatar huajiang-tubi avatar i10416 avatar j-madden avatar marcinaylien avatar mikulskibartosz avatar mjakubowski84 avatar moonkev avatar sndnv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

parquet4s's Issues

Support for reading and writing mutable version of Scala collection

If anybody would really need this functionality then this issue can be addressed.

At this moment the library allows only to define immutable collections in case class schema. Support for collections like list, seq, array, etc. can be added by developer on their own by defining own CollectionTransformer (https://github.com/mjakubowski84/parquet4s/blob/master/core/src/main/scala/com/github/mjakubowski84/parquet4s/CollectionTransformer.scala).
Regarding Map there would be need to define own ValueCodec.
Library can provide such functionality out of the box.

Filter with `UserDefined` predicates?

I have a use case for filtering a Parquet file using a list of IDs. Building it up using the operators provided seems like it would be unwieldy, and possibly also quite slow. The Parquet library supports user-defined predicates, but, as far as I can tell, they are not exposed by the Filter mechanism.

Is there a way to supply a user-defined predicate, or would it be feasible to add that functionality?

Minimal hadoop deps for core module

Hi!

Would you be open to a pull request which cuts down the dependencies for your core module from hadoop-client to hadoop-common?

Reasons someone using your project might want this:

  • they might be using a slightly different version of hadoop client
    • not different enough that writing parquet files is different
    • but different enough to cause plenty of dependency pain with peripheral libraries
  • they might not be using log4j but depending on the hadoop client aggregator package brings log4j into compile scope

I have a proof of concept at rtoomey-coatue@5d385d6 with your tests and integration tests passing.

Either way, your library is awesome and this is something Scala really needs! Also, I enjoyed reading your tests ๐Ÿ˜ธ

Hadoop configuration not used on path validation

Hi Marcin,

I am trying to setup a IndefiniteStreamParquetSink with a programmatically generated Hadoop Configuration which contains a key,secret and endpoint as stated in the documentation https://github.com/mjakubowski84/parquet4s#passing-hadoop-configs-programmatically.

The IndefiniteStreamParquetSink validates the write path before it continues to create the flow

.

But it looks like the hadoop configuration is not being used while validating the write path here :

val fs = path.getFileSystem(new Configuration())

It looks like it should use the hadoop Configuration from the given write options writeOptions.hadoopConf .

Let me know what you think before I create a Pull request :)

Regards,

Tim

Read subset of columns?

Should I expect to be able to use a ParquetReader to read a subset of the columns in a parquet file?

For example:

     case class Orig(a: String, b: Int)
     val originals: Seq[Orig] = ???
     ParquetWriter[Orig].write("my_file.parquet", originals)

     case class Projected(b: Int)
     val justBs = ParquetReader.read[Projected]("my_file.parquet")
     for { (original, justb) <- originals.zip(justBs) } {
          assert(original.b == justb.b)
     }

I think I've seen similar functionality advertised for hadoop's AvroParquetReader, but it does not seem to work with Parquet4s. (And maybe it is not intended to work, but it never hurts to ask ...)

Serdes functionality

Hello I'm reading a raw Array[Byte] and wanna reconstruct my Parquet object from it. Can I do that with current implementation ? If not, do you consider that to be a valuable contribution?

Thanks,
Boris

[question] Does parquet4 support partitioning?

Hi,

does parquet4s support partitioning?
E.g. by date: some_key/year=2019/month=9/day=24/

I think this could be quite tedious to implement, if not supported by the lib. If possible at all?

Context: I'm trying to replace a Spark job that reads from Kafka and writes to S3 with an Akka Streams application.

Thanks!

Support for large case classes?

My case class is big (~10 fields, each is an Option[X] for a different X, where X is another case class containing 10 fields with type Option[String] or Option[Seq[String]] or Option[Y] where Y is another case class.... you get the idea)

The problem is that, when trying to resolve the schema, the scala compiler thinks the implicit expansion is divergent and aborts.

Do I have any options here besides describing the entire schema manually?

Enabling Incremental Writes in the core API

I have a use-case where I need to periodically append to a parquet file and then eventually close it. Right now the write method assumes you have all of your data available (or at least a reference to a single iterable). This seems like it should be fairly simple to implement by exposing the functionality of the internalWriter, but it doesn't seem to align well with the current ParquetWriter type class, which only has one method write(path, data, options). IMO it would be reasonable to make ParquetWriter be a class like:

class IncrementalParquetWriter[T: ParquetRecordEncoder : ParquetSchemaResolver](path: String, options: ParquetWriter.Options = ParquetWriter.Options()) {
  private val writer = internalWriter(new Path(path), ParquetSchemaResolver.resolveSchema[T], options)
  private val valueCodecConfiguration = options.toValueCodecConfiguration
  def write(data: Iterable[T]): Unit = data.foreach { elem =>
    writer.write(ParquetRecordEncoder.encode[T](elem, valueCodecConfiguration))
  }
  def close(): Unit = writer.close()
}

But that breaks the interface currently defined by the type class and AFAIK is actually impossible to implement as a type class since it takes constructor parameters. Do you have any suggestions for how this should be implemented? Should it maybe just totally side-step the type class?

The only thing I've gotten working without totally wrecking the type class methods is the following, which feels very clunky:

/**
  * Type class that allows to write data which schema is represented by type <i>T</i> to given path.
  * @tparam T schema of data to write
  */
trait ParquetWriter[T] {

  /**
    * Writes data to given path.
    * @param path location where files are meant to be written
    * @param data data to write
    * @param options configuration of how Parquet files should be created and written
    */
  def write(path: String, data: Iterable[T], options: ParquetWriter.Options)

  /**
    * Instantiate a new [[IncrementalParquetWriter]]
    * @param path The path to which this writer will write
    * @param options Options for writing
    */
  def incrementalWriter(path: String, options: ParquetWriter.Options): IncrementalParquetWriter[T]

}

Write readme

  • What?
    Parquet reader (maybe also writer in the future) for Scala
  • Why?
    I want to use case class as a schema for Parquet file. I don't want to use heavy Spark for that.
    So use parquet4s!
  • How to use?
    • Example for local
    • Example for s3
    • Example for Akka Streams
  • Add tags :)

How to write in streaming fashion?

I have a long-running application based on parquet4s that receives a continuous stream of records from Kafka and writes them out in large batches as parquet files.

It does not use Akka streams, but just relies on the core ParquetWriter in parquet4s.

Since the parquetwriter requires me to pass a preassembled collection containing the entire batch and I want my batches to be quite large, heap usage is problematic and flushing out a file places tremendous pressure on the garbage collector.

I have tried passing the parquetwriter a lazy stream that doesn't read the records from kafka until the parquet writer iterates over it, but this did not seem to help.

Is my use case supported? If so, what should I do?

User supplied hadoop configuration not used in builder

Currently the Hadoop parquet reader in the ParquetIterableImpl is built without using the user provided configuration, therefore always building with the default configuration. I would like to update this to add the config before building.

Thanks,
David

Support Parquet Read and Write from/to GCS

Currently, I am working on a use-case that involves reading and writing parquet files to/from Google Cloud Storage from my Akka-stream application.
GCS uses Cloud Storage Connector (https://github.com/GoogleCloudPlatform/bigdata-interop/tree/master/gcs) that contains the implementation for the GS File Scheme. HadoopParquetWriter needs setting certain Hadoop fs confs to interact with GCS (https://github.com/GoogleCloudPlatform/bigdata-interop/blob/master/gcs/INSTALL.md)
I noticed, however, that the current implementation of the ParquetWriter does not pass these confs.

As a quick solution, I forked the ParquetWriter class and modified the customInternalWriter method to accept Hadoop conf as an argument. This worked beautifully.

@mjakubowski84 Wanted to know if you think this is a reasonable approach? Do you see any potential problem with this approach?
Happy to submit the PR if you think this is a good solution.

Thanks

How do I use an existing MessageType?

I already have a MessageType that I'd like to use to write parquet files, most of the actual message type schema setting is private, is there a way to do this I'm missing?

POST-Release: update Readme and Notice

Update documents AFTER release:

  • no more need for NOTICE as Spark code is deleted
  • update README:
    • new version of lib in deps
    • no more need for imports of implicits
    • docu how to write with core and akka
    • docu how to extend libs by custom type classes
  • introduce CONTRIBUTIONS!

Add support for java.time.LocalDate and java.time.LocalDateTime

Hello @mjakubowski84

First let me say that this is a wonderful project. I've been hoping for something like this for quite a while now, so many thanks for this! I am raising this issue as most often we are working with the Java 8 date/time API. It is often times difficult to work with the java.sql versions, and much conversion is done on our side to accommodate using them in our case classes which will be used with parquet4s. My proposal would be to support both the java.sql date/time classes as well as the java.time classes.

Also, we have even run into issues when rendering certain timestamps in an Impala table that we will see a UTC midnight instead representing the previous day, but with 24:00 as the time (I'm guessing this has to do with the day and actual epoch being encoded separately in parquet - but I'm not too familiar with exactly how timestamps are encoded in parquet). So essentially, they represent the same instant but are rendered differently. However, I think this is more due to #21 than the data type.

I have made the following commit on my fork that adds support for the java.time API as well as makes the timezone on TimeValueCodecs configurable (that was really the best way I could think of to make it configurable. Was thinking of a system property, but that makes testing a little off, and locks you in for the lifetime of your application). I have also updated test cases to account for these changes. The commit can be found here

Please let me know if this commit looks suitable to you and I will raise a pull request. Also, if there are any changes that you would like me to make to this, I am more than happy to implement them.

Unit test of ParquetReader Iterable

To include:

  • if multiple iterators are closed with call of close on iterable
  • multiple hasNext calls
  • if all rows are read
  • reading of empty data
  • call on next when no more data available

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.