Giter Site home page Giter Site logo

parquet-go / parquet-go Goto Github PK

View Code? Open in Web Editor NEW

This project forked from segmentio/parquet-go

256.0 256.0 43.0 8.09 MB

High-performance Go package to read and write Parquet files

Home Page: https://pkg.go.dev/github.com/parquet-go/parquet-go

License: Apache License 2.0

Go 86.26% Assembly 13.73% Makefile 0.02%
columnar-format golang parquet parquet-files performance

parquet-go's Introduction

parquet-go/parquet-go build status Go Report Card Go Reference

High-performance Go library to manipulate parquet files, initially developed at Twilio Segment.

parquet-go-logo

Motivation

Parquet has been established as a powerful solution to represent columnar data on persistent storage mediums, achieving levels of compression and query performance that enable managing data sets at scales that reach the petabytes. In addition, having intensive data applications sharing a common format creates opportunities for interoperation in our tool kits, providing greater leverage and value to engineers maintaining and operating those systems.

The creation and evolution of large scale data management systems, combined with realtime expectations come with challenging maintenance and performance requirements, that existing solutions to use parquet with Go were not addressing.

The parquet-go/parquet-go package was designed and developed to respond to those challenges, offering high level APIs to read and write parquet files, while keeping a low compute and memory footprint in order to be used in environments where data volumes and cost constraints require software to achieve high levels of efficiency.

Specification

Columnar storage allows Parquet to store data more efficiently than, say, using JSON or Protobuf. For more information, refer to the Parquet Format Specification.

Installation

The package is distributed as a standard Go module that programs can take a dependency on and install with the following command:

go get github.com/parquet-go/parquet-go

Go 1.21 or later is required to use the package.

Compatibility Guarantees

The package is currently released as a pre-v1 version, which gives maintainers the freedom to break backward compatibility to help improve the APIs as we learn which initial design decisions would need to be revisited to better support the use cases that the library solves for. These occurrences are expected to be rare in frequency and documentation will be produce to guide users on how to adapt their programs to breaking changes.

Usage

The following sections describe how to use APIs exposed by the library, highlighting the use cases with code examples to demonstrate how they are used in practice.

Writing Parquet Files: parquet.GenericWriter[T]

A parquet file is a collection of rows sharing the same schema, arranged in columns to support faster scan operations on subsets of the data set.

For simple use cases, the parquet.WriteFile[T] function allows the creation of parquet files on the file system from a slice of Go values representing the rows to write to the file.

type RowType struct { FirstName, LastName string }

if err := parquet.WriteFile("file.parquet", []RowType{
    {FirstName: "Bob"},
    {FirstName: "Alice"},
}); err != nil {
    ...
}

The parquet.GenericWriter[T] type denormalizes rows into columns, then encodes the columns into a parquet file, generating row groups, column chunks, and pages based on configurable heuristics.

type RowType struct { FirstName, LastName string }

writer := parquet.NewGenericWriter[RowType](output)

_, err := writer.Write([]RowType{
    ...
})
if err != nil {
    ...
}

// Closing the writer is necessary to flush buffers and write the file footer.
if err := writer.Close(); err != nil {
    ...
}

Explicit declaration of the parquet schema on a writer is useful when the application needs to ensure that data written to a file adheres to a predefined schema, which may differ from the schema derived from the writer's type parameter. The parquet.Schema type is a in-memory representation of the schema of parquet rows, translated from the type of Go values, and can be used for this purpose.

schema := parquet.SchemaOf(new(RowType))
writer := parquet.NewGenericWriter[any](output, schema)
...

Reading Parquet Files: parquet.GenericReader[T]

For simple use cases where the data set fits in memory and the program will read most rows of the file, the parquet.ReadFile[T] function returns a slice of Go values representing the rows read from the file.

type RowType struct { FirstName, LastName string }

rows, err := parquet.ReadFile[RowType]("file.parquet")
if err != nil {
    ...
}

for _, c := range rows {
    fmt.Printf("%+v\n", c)
}

The expected schema of rows can be explicitly declared when the reader is constructed, which is useful to ensure that the program receives rows matching an specific format; for example, when dealing with files from remote storage sources that applications cannot trust to have used an expected schema.

Configuring the schema of a reader is done by passing a parquet.Schema instance as argument when constructing a reader. When the schema is declared, conversion rules implemented by the package are applied to ensure that rows read by the application match the desired format (see Evolving Parquet Schemas).

schema := parquet.SchemaOf(new(RowType))
reader := parquet.NewReader(file, schema)
...

Inspecting Parquet Files: parquet.File

Sometimes, lower-level APIs can be useful to leverage the columnar layout of parquet files. The parquet.File type is intended to provide such features to Go applications, by exposing APIs to iterate over the various parts of a parquet file.

f, err := parquet.OpenFile(file, size)
if err != nil {
    ...
}

for _, rowGroup := range f.RowGroups() {
    for _, columnChunk := range rowGroup.ColumnChunks() {
        ...
    }
}

Evolving Parquet Schemas: parquet.Convert

Parquet files embed all the metadata necessary to interpret their content, including a description of the schema of the tables represented by the rows and columns they contain.

Parquet files are also immutable; once written, there is not mechanism for updating a file. If their contents need to be changed, rows must be read, modified, and written to a new file.

Because applications evolve, the schema written to parquet files also tend to evolve over time. Those requirements creating challenges when applications need to operate on parquet files with heterogenous schemas: algorithms that expect new columns to exist may have issues dealing with rows that come from files with mismatching schema versions.

To help build applications that can handle evolving schemas, parquet-go/parquet-go implements conversion rules that create views of row groups to translate between schema versions.

The parquet.Convert function is the low-level routine constructing conversion rules from a source to a target schema. The function is used to build converted views of parquet.RowReader or parquet.RowGroup, for example:

type RowTypeV1 struct { ID int64; FirstName string }
type RowTypeV2 struct { ID int64; FirstName, LastName string }

source := parquet.SchemaOf(RowTypeV1{})
target := parquet.SchemaOf(RowTypeV2{})

conversion, err := parquet.Convert(target, source)
if err != nil {
    ...
}

targetRowGroup := parquet.ConvertRowGroup(sourceRowGroup, conversion)
...

Conversion rules are automatically applied by the parquet.CopyRows function when the reader and writers passed to the function also implement the parquet.RowReaderWithSchema and parquet.RowWriterWithSchema interfaces. The copy determines whether the reader and writer schemas can be converted from one to the other, and automatically applies the conversion rules to facilitate the translation between schemas.

At this time, conversion rules only supports adding or removing columns from the schemas, there are no type conversions performed, nor ways to rename columns, etc... More advanced conversion rules may be added in the future.

Sorting Row Groups: parquet.GenericBuffer[T]

The parquet.GenericWriter[T] type is optimized for minimal memory usage, keeping the order of rows unchanged and flushing pages as soon as they are filled.

Parquet supports expressing columns by which rows are sorted through the declaration of sorting columns on row groups. Sorting row groups requires buffering all rows before ordering and writing them to a parquet file.

To help with those use cases, the parquet-go/parquet-go package exposes the parquet.GenericBuffer[T] type which acts as a buffer of rows and implements sort.Interface to allow applications to sort rows prior to writing them to a file.

The columns that rows are ordered by are configured when creating parquet.GenericBuffer[T] instances using the parquet.SortingColumns function to construct row group options configuring the buffer. The type of parquet columns defines how values are compared, see Parquet Logical Types for details.

When written to a file, the buffer is materialized into a single row group with the declared sorting columns. After being written, buffers can be reused by calling their Reset method.

The following example shows how to use a parquet.GenericBuffer[T] to order rows written to a parquet file:

type RowType struct { FirstName, LastName string }

buffer := parquet.NewGenericBuffer[RowType](
    parquet.SortingRowGroupConfig(
        parquet.SortingColumns(
            parquet.Ascending("LastName"),
            parquet.Ascending("FistName"),
        ),
    ),
)

buffer.Write([]RowType{
    {FirstName: "Luke", LastName: "Skywalker"},
    {FirstName: "Han", LastName: "Solo"},
    {FirstName: "Anakin", LastName: "Skywalker"},
})

sort.Sort(buffer)

writer := parquet.NewGenericWriter[RowType](output)
_, err := parquet.CopyRows(writer, buffer.Rows())
if err != nil {
    ...
}
if err := writer.Close(); err != nil {
    ...
}

Merging Row Groups: parquet.MergeRowGroups

Parquet files are often used as part of the underlying engine for data processing or storage layers, in which cases merging multiple row groups into one that contains more rows can be a useful operation to improve query performance; for example, bloom filters in parquet files are stored for each row group, the larger the row group, the fewer filters need to be stored and the more effective they become.

The parquet-go/parquet-go package supports creating merged views of row groups, where the view contains all the rows of the merged groups, maintaining the order defined by the sorting columns of the groups.

There are a few constraints when merging row groups:

  • The sorting columns of all the row groups must be the same, or the merge operation must be explicitly configured a set of sorting columns which are a prefix of the sorting columns of all merged row groups.

  • The schemas of row groups must all be equal, or the merge operation must be explicitly configured with a schema that all row groups can be converted to, in which case the limitations of schema conversions apply.

Once a merged view is created, it may be written to a new parquet file or buffer in order to create a larger row group:

merge, err := parquet.MergeRowGroups(rowGroups)
if err != nil {
    ...
}

writer := parquet.NewGenericWriter[RowType](output)
_, err := parquet.CopyRows(writer, merge)
if err != nil {
    ...
}
if err := writer.Close(); err != nil {
    ...
}

Using Bloom Filters: parquet.BloomFilter

Parquet files can embed bloom filters to help improve the performance of point lookups in the files. The format of parquet bloom filters is documented in the parquet specification: Parquet Bloom Filter

By default, no bloom filters are created in parquet files, but applications can configure the list of columns to create filters for using the parquet.BloomFilters option when instantiating writers; for example:

type RowType struct {
    FirstName string `parquet:"first_name"`
    LastName  string `parquet:"last_name"`
}

const filterBitsPerValue = 10
writer := parquet.NewGenericWriter[RowType](output,
    parquet.BloomFilters(
        // Configures the write to generate split-block bloom filters for the
        // "first_name" and "last_name" columns of the parquet schema of rows
        // witten by the application.
        parquet.SplitBlockFilter(filterBitsPerValue, "first_name"),
        parquet.SplitBlockFilter(filterBitsPerValue, "last_name"),
    ),
)
...

Generating bloom filters requires to know how many values exist in a column chunk in order to properly size the filter, which requires buffering all the values written to the column in memory. Because of it, the memory footprint of parquet.GenericWriter[T] increases linearly with the number of columns that the writer needs to generate filters for. This extra cost is optimized away when rows are copied from a parquet.GenericBuffer[T] to a writer, since in this case the number of values per column in known since the buffer already holds all the values in memory.

When reading parquet files, column chunks expose the generated bloom filters with the parquet.ColumnChunk.BloomFilter method, returning a parquet.BloomFilter instance if a filter was available, or nil when there were no filters.

Using bloom filters in parquet files is useful when performing point-lookups in parquet files; searching for column rows matching a given value. Programs can quickly eliminate column chunks that they know does not contain the value they search for by checking the filter first, which is often multiple orders of magnitude faster than scanning the column.

The following code snippet hilights how filters are typically used:

var candidateChunks []parquet.ColumnChunk

for _, rowGroup := range file.RowGroups() {
    columnChunk := rowGroup.ColumnChunks()[columnIndex]
    bloomFilter := columnChunk.BloomFilter()

    if bloomFilter != nil {
        if ok, err := bloomFilter.Check(value); err != nil {
            ...
        } else if !ok {
            // Bloom filters may return false positives, but never return false
            // negatives, we know this column chunk does not contain the value.
            continue
        }
    }

    candidateChunks = append(candidateChunks, columnChunk)
}

Optimizations

The following sections describe common optimization techniques supported by the library.

Optimizing Reads

Lower level APIs used to read parquet files offer more efficient ways to access column values. Consecutive sequences of values are grouped into pages which are represented by the parquet.Page interface.

A column chunk may contain multiple pages, each holding a section of the column values. Applications can retrieve the column values either by reading them into buffers of parquet.Value, or type asserting the pages to read arrays of primitive Go values. The following example demonstrates how to use both mechanisms to read column values:

pages := column.Pages()
defer func() {
    checkErr(pages.Close())
}()

for {
    p, err := pages.ReadPage()
    if err != nil {
        ... // io.EOF when there are no more pages
    }

    switch page := p.Values().(type) {
    case parquet.Int32Reader:
        values := make([]int32, page.NumValues())
        _, err := page.ReadInt32s(values)
        ...
    case parquet.Int64Reader:
        values := make([]int64, page.NumValues())
        _, err := page.ReadInt64s(values)
        ...
    default:
        values := make([]parquet.Value, page.NumValues())
        _, err := page.ReadValues(values)
        ...
    }
}

Reading arrays of typed values is often preferable when performing aggregations on the values as this model offers a more compact representation of the values in memory, and pairs well with the use of optimizations like SIMD vectorization.

Optimizing Writes

Applications that deal with columnar storage are sometimes designed to work with columnar data throughout the abstraction layers; it then becomes possible to write columns of values directly instead of reconstructing rows from the column values. The package offers two main mechanisms to satisfy those use cases:

A. Writing Columns of Typed Arrays

The first solution assumes that the program works with in-memory arrays of typed values, for example slices of primitive Go types like []float32; this would be the case if the application is built on top of a framework like Apache Arrow.

parquet.GenericBuffer[T] is an implementation of the parquet.RowGroup interface which maintains in-memory buffers of column values. Rows can be written by either boxing primitive values into arrays of parquet.Value, or type asserting the columns to a access specialized versions of the write methods accepting arrays of Go primitive types.

When using either of these models, the application is responsible for ensuring that the same number of rows are written to each column or the resulting parquet file will be malformed.

The following examples demonstrate how to use these two models to write columns of Go values:

type RowType struct { FirstName, LastName string }

func writeColumns(buffer *parquet.GenericBuffer[RowType], firstNames []string) error {
    values := make([]parquet.Value, len(firstNames))
    for i := range firstNames {
        values[i] = parquet.ValueOf(firstNames[i])
    }
    _, err := buffer.ColumnBuffers()[0].WriteValues(values)
    return err
}
type RowType struct { ID int64; Value float32 }

func writeColumns(buffer *parquet.GenericBuffer[RowType], ids []int64, values []float32) error {
    if len(ids) != len(values) {
        return fmt.Errorf("number of ids and values mismatch: ids=%d values=%d", len(ids), len(values))
    }
    columns := buffer.ColumnBuffers()
    if err := columns[0].(parquet.Int64Writer).WriteInt64s(ids); err != nil {
        return err
    }
    if err := columns[1].(parquet.FloatWriter).WriteFloats(values); err != nil {
        return err
    }
    return nil
}

The latter is more efficient as it does not require boxing the input into an intermediary array of parquet.Value. However, it may not always be the right model depending on the situation, sometimes the generic abstraction can be a more expressive model.

B. Implementing parquet.RowGroup

Programs that need full control over the construction of row groups can choose to provide their own implementation of the parquet.RowGroup interface, which includes defining implementations of parquet.ColumnChunk and parquet.Page to expose column values of the row group.

This model can be preferable when the underlying storage or in-memory representation of the data needs to be optimized further than what can be achieved by using an intermediary buffering layer with parquet.GenericBuffer[T].

See parquet.RowGroup for the full interface documentation.

C. Using on-disk page buffers

When generating parquet files, the writer needs to buffer all pages before it can create the row group. This may require significant amounts of memory as the entire file content must be buffered prior to generating it. In some cases, the files might even be larger than the amount of memory available to the program.

The parquet.GenericWriter[T] can be configured to use disk storage instead as a scratch buffer when generating files, by configuring a different page buffer pool using the parquet.ColumnPageBuffers option and parquet.PageBufferPool interface.

The parquet-go/parquet-go package provides an implementation of the interface which uses temporary files to store pages while a file is generated, allowing programs to use local storage as swap space to hold pages and keep memory utilization to a minimum. The following example demonstrates how to configure a parquet writer to use on-disk page buffers:

type RowType struct { ... }

writer := parquet.NewGenericWriter[RowType](output,
    parquet.ColumnPageBuffers(
        parquet.NewFileBufferPool("", "buffers.*"),
    ),
)

When a row group is complete, pages buffered to disk need to be copied back to the output file. This results in doubling I/O operations and storage space requirements (the system needs to have enough free disk space to hold two copies of the file). The resulting write amplification can often be optimized away by the kernel if the file system supports copy-on-write of disk pages since copies between os.File instances are optimized using copy_file_range(2) (on linux).

See parquet.PageBufferPool for the full interface documentation.

Maintenance

While initial design and development occurred at Twilio Segment, the project is now maintained by the open source community. We welcome external contributors. to participate in the form of discussions or code changes. Please review to the Contribution guidelines as well as the Code of Conduct before submitting contributions.

Continuous Integration

The project uses Github Actions for CI.

Debugging

The package has debugging capabilities built in which can be turned on using the PARQUETGODEBUG environment variable. The value follows a model similar to GODEBUG, it must be formatted as a comma-separated list of key=value pairs.

The following debug flag are currently supported:

  • tracebuf=1 turns on tracing of internal buffers, which validates that reference counters are set to zero when buffers are reclaimed by the garbage collector. When the package detects that a buffer was leaked, it logs an error message along with the stack trace captured when the buffer was last used.

parquet-go's People

Contributors

achille-roussel avatar annanay25 avatar asubiotto avatar bartleyg avatar bprosnitz avatar brancz avatar cyriltovena avatar dependabot[bot] avatar derekperkins avatar forsaken628 avatar fpetkovski avatar gernest avatar hhoughgg avatar javierhonduco avatar jeffail avatar joe-elliott avatar kevinburkesegment avatar mapno avatar mdisibio avatar michaelurman avatar ngotchac avatar pelletier avatar pryz avatar shinena1998 avatar stoewer avatar thorfour avatar tschaub avatar yonesko avatar zolstein avatar zolstein-clumio avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

parquet-go's Issues

Problems with dynamic schemas and writing with optional fields

I am new to this package, and have a requirement for building dynamic schema's at runtime, but am having problems getting the whole solution to work. I have seen many other issues regarding this, but have been unable to piece together a full solution. Most the other issues are either unanswered, or don't have a full code snippet that works. As part of this, I am also needing Optional fields.

These are all the similar issues I have read and have not been able to figure it out.


I would like to create a schema at runtime. Both of these two options appear to work correctly for me when I create the schema.

schema := parquet.NewSchema("runtime_schema", parquet.Group{
	"id":  parquet.Optional(parquet.Int(32)),
	"age": parquet.Optional(parquet.Int(32)),
})
structFields := []reflect.StructField{}

tp := reflect.TypeOf(int32(0))

structFields = append(structFields, reflect.StructField{
	Name: strings.ToUpper("age"),
	Type: tp,
	Tag:  reflect.StructTag(fmt.Sprintf(`parquet:"%v,optional"`, "age")),
})
structFields = append(structFields, reflect.StructField{
	Name: strings.ToUpper("id"),
	Type: tp,
	Tag:  reflect.StructTag(fmt.Sprintf(`parquet:"%v,optional"`, "id")),
})

structType := reflect.StructOf(structFields)
structElem := reflect.New(structType)

schema := parquet.NewSchema("runtime_schema", parquet.SchemaOf(structElem.Interface()))

Both of those options create the same schema as expected:

message runtime_schema {
        optional int32 age (INT(32,true));
        optional int32 id (INT(32,true));
}

I am not sure on which option is better? I have seen both ways suggested in different issues: #30 (comment), segmentio#311 (comment).

Now, the problem I am running in to, is using the the GenericWriter with this schema. I am unsure of what to put for the type parameter, so I have been using any. It compiles, and effectively "works", but nothing I have tries works with Optional fields.

When trying to use WriteRows with optional fields, in any way the schema is defined(typed struct, Group, or reflection), the optional fields are always null. This bug was noted here:#25 (comment), although the comment made it seem like it was only on the deprecated Writer api, however it appears to also apply to the GenericWriter as well. Example of the bug with GenericWriter: #30 (comment).

Now, I don't have a specific need for WriteRows right now, but it's the only way I could get it to compile, or not panic when writing. When trying to use Write instead, I run into panics. Usually of some reflection call. I'm sure I'm just making a simple mistake.

I am able to use Write like this:

func main() {
	schema := parquet.NewSchema("runtime_schema", parquet.Group{
		"id":  parquet.Optional(parquet.Int(32)),
		"age": parquet.Optional(parquet.Int(32)),
	})

	var f bytes.Buffer
	pw := parquet.NewGenericWriter[any](&f, schema)

	a := []any{
		map[string]int32{
			"id":  20,
			"age": 22,
		},
	}

	_, err := pw.Write(a)
	if err != nil {
		log.Fatal(err)
	}
	if err := pw.Close(); err != nil {
		log.Fatal(err)
	}

	reader := parquet.NewGenericReader[any](bytes.NewReader(f.Bytes()), schema)
	out := make([]parquet.Row, 1)
	if _, err := reader.ReadRows(out); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%+v", out)
}

Prints: [[C:0 D:1 R:0 V:22 C:1 D:1 R:0 V:20]]

however this very simplified schema will not work for my use case. Because the schema isn't just of one type. So I would need to use a map[string]any, but it panics.

Example:

func main() {
	schema := parquet.NewSchema("runtime_schema", parquet.Group{
		"id":   parquet.Optional(parquet.Int(32)),
		"age":  parquet.Optional(parquet.Int(32)),
		"name": parquet.Optional(parquet.String()),
	})

	var f bytes.Buffer
	pw := parquet.NewGenericWriter[any](&f, schema)

	a := []any{
		map[string]any{
			"id":   20,
			"age":  22,
			"name": "bob",
		},
	}

	_, err := pw.Write(a)
	if err != nil {
		log.Fatal(err)
	}
	if err := pw.Close(); err != nil {
		log.Fatal(err)
	}

	reader := parquet.NewGenericReader[any](bytes.NewReader(f.Bytes()), schema)
	out := make([]parquet.Row, 1)
	if _, err := reader.ReadRows(out); err != nil {
		log.Fatal(err)
	}
	fmt.Printf("%+v", out)
}

This panics with:

panic: cannot create parquet value of type INT32 from go value of type interface {}

goroutine 1 [running]:
github.com/parquet-go/parquet-go.makeValue(0x1, 0xc00018a930, {0x796100?, 0xc0000254a0?, 0xc000193a60?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/value.go:339 +0xd89
github.com/parquet-go/parquet-go.deconstructFuncOfLeaf.func1({0xc00009c3c0, 0x3, 0xc0000a7410?}, {0xd0?, 0x71?, 0x41?}, {0x796100?, 0xc0000254a0?, 0x198?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/row.go:538 +0x99
github.com/parquet-go/parquet-go.deconstructFuncOfOptional.func1({0xc00009c3c0?, 0x3?, 0x3?}, {0x0?, 0x0?, 0x0?}, {0x796100?, 0xc0000254a0?, 0xc000193bd0?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/row.go:439 +0x103
github.com/parquet-go/parquet-go.deconstructFuncOfGroup.func1({0xc00009c3c0, 0x3, 0x3}, {0xc0?, 0x3b?, 0x19?}, {0x79d840?, 0xc0000a7410?, 0x7fb0e0?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/row.go:515 +0x19c
github.com/parquet-go/parquet-go.(*Schema).deconstructValueToColumns(0xc000092300, {0xc00009c3c0, 0x3, 0x3}, {0x79d840?, 0xc0000a7410?, 0x7a8400?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/schema.go:236 +0x114
github.com/parquet-go/parquet-go.(*Schema).Deconstruct(0xc000092300, {0x0, 0x0, 0x0}, {0x79d840?, 0xc0000a7410})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/schema.go:224 +0x1f4
github.com/parquet-go/parquet-go.(*Writer).Write(0xc0000925a0, {0x79d840, 0xc0000a7410})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:379 +0x1d6
github.com/parquet-go/parquet-go.(*GenericWriter[...]).writeAny(0x0, {0xc000025490?, 0x1, 0x10})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:233 +0x65
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write.func1(0xc000193e30?)
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:169 +0x62
github.com/parquet-go/parquet-go.(*writer).writeRows(0xc000188160, 0x1, 0xc000193e28)
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:993 +0xbb
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write(0xc0000a7410?, {0xc000025490?, 0x4?, 0xc0000686b0?})
        /home/kason/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:168 +0x65
main.main()
        /home/kason/nuspire/nusiem-log-mover/cmd/parquet2/main.go:36 +0x4cd
exit status 2

Any help would be greatly appreciated! Thank you.

How to keep columns order definition ?

How can I maintain the order of column definitions?
It appears to have been resolved by PR 140, but I have yet to discover how to do this.

package main

import (
	"fmt"

	"github.com/parquet-go/parquet-go"
)

func main() {

	group := parquet.Group{
		"C": parquet.String(), // 0
		"B": parquet.String(), // 1
		"A": parquet.String(), // 2
	}

	schema := parquet.NewSchema("runtime_schema", group)

	for i, column := range schema.Columns() {
		fmt.Println(i, column[0])
	}

	/*
		0 A
		1 B
		2 C
	*/
}

`SortingWriter` only writes a single row group

While using the SortingWriter API I noticed that my files were awfully small. I expected them to be >1GiB but they were around 2 MiB. This did not happen previously when I used it a few months ago.

I wrote up a short repro in Go.
I used lz4_raw_compressed_larger.parquet from the apache/parquet-testing repo. The script reads this file, which has 10,000 rows, and then writes a new copy of the input using a SortingWriter. We set sortRowCount to 1000 in the SortingWriter to reproduce the issue. The issue is not observed when sortRowCount is greater than 10,000.
With these settings, we observe only 1024 rows being written, where we expected all 10,000.

Code
package main

import (
	"os"

	"github.com/parquet-go/parquet-go"
	"github.com/sirupsen/logrus"
)

func main() {
	type Record struct {
		A string `parquet:"a"`
	}

	fi, err := os.Open("lz4_raw_compressed_larger.parquet")
	if err != nil {
		logrus.WithError(err).Fatal("couldn't open input")
	}

	pr := parquet.NewGenericReader[Record](fi, parquet.DefaultReaderConfig())
	defer pr.Close()

	out, err := os.Create("test.parquet")
	if err != nil {
		logrus.WithError(err).Fatal("couldn't create output file")
	}

	pw := parquet.NewSortingWriter[Record](
		out,
		1000,
		parquet.SortingWriterConfig(
			parquet.SortingColumns(parquet.Ascending("a")),
		),
	)

	if _, err := parquet.CopyRows(pw, pr); err != nil {
		logrus.WithError(err).Fatal("couldn't copy rows")
	}

	if err := pw.Close(); err != nil {
		logrus.WithError(err).Fatal("couldn't close parquet writer")
	}

	if err := out.Sync(); err != nil {
		logrus.WithError(err).Fatal("couldn't sync to disk")
	}

	result := parquet.NewGenericReader[Record](out, parquet.DefaultReaderConfig())
	defer result.Close()
	if result.NumRows() != pr.NumRows() {
		logrus.Fatalf("resulting parquet file did not have expected rows (expected %d, got %d)", pr.NumRows(), result.NumRows())
	}
}

// ❯ go run .
// FATA[0000] resulting parquet file did not have expected rows (expected 10000, got 1024)

While digging into the issue, I noticed a piece of code that was added recently:

parquet-go/merge.go

Lines 111 to 118 in d42d339

func (r *mergedRowGroupRows) WriteRowsTo(w RowWriter) (n int64, err error) {
b := newMergeBuffer()
b.setup(r.rows, r.merge.compare)
n, err = b.WriteRowsTo(w)
r.rowIndex += int64(n)
b.release()
return
}

It appears that it was added to fix a data corruption issue in #31. In any case, I commented out this piece of code, so as to make CopyRows fall back to the existing RowReader API, and that fixed the above repro script I made.

My conclusion is that the addition of mergeBuffer probably introduced the bug I am seeing right now.

Support nested array fields

Descriptions

It seems to have some problems if the Golang struct has a nested array field(e.g. [][]string). You can check the following example code.
NestedStringArrayWithTag has a Field with [][]int type, however when I use NewGenericWriter() to write to a file, then Field become []int type. It is rather unexcepted.

package main

import (
	"log"
	"os"

	"github.com/parquet-go/parquet-go"
)

type NestedArrayWithTag struct {
	Field [][]int `parquet:"field,list"`
}

func main() {
	output, _ := os.Create("file.parquet")
	defer output.Close()

	writer := parquet.NewGenericWriter[NestedArrayWithTag](output)
	_, err := writer.Write([]NestedArrayWithTag{
		{
			Field: [][]int{{1}},
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	if err := writer.Close(); err != nil {
		log.Fatal(err)
	}

	// contents of file.parquet

	// ##################
	// File: file.parquet
	// ##################
	// {field: [1]}

        // should be {field: [[1]]}
}

PS. I have used a list tag to prevent this problem.

If I use parquet-cli to cat the file, it raises the following exception

parquet cat file.parquet
Unknown error
java.io.FileNotFoundException: File file:/Users/xxx/others/parquet-go/file.parquet does not exist
        at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:668)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:989)
        at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:658)
        at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:458)
        at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:148)
        at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:349)
        at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906)
        at org.apache.parquet.cli.util.SeekableFSDataInputStream.<init>(SeekableFSDataInputStream.java:38)
        at org.apache.parquet.cli.BaseCommand.openSeekable(BaseCommand.java:240)
        at org.apache.parquet.cli.BaseCommand.getAvroSchema(BaseCommand.java:399)
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:66)
        at org.apache.parquet.cli.Main.run(Main.java:163)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.parquet.cli.Main.main(Main.java:193)

Expected Result:

  • should output data correctly for nested array type

Merging row groups throws panic when rows from one group are empty

Merging multiple sorted row groups will throw a panic when one of the groups returns empty rows during ReadRows, even if n is correctly returned as 0. The reason why we sometimes return empty rows is because we apply filtering during the merge process, and occasionally no rows will match the filter. I think this only happens when the entire row group is empty.

Here is a PR reproducing the issue: #67.

Dynamic schemas: panic: cannot create parquet value of type BYTE_ARRAY from go value of type interface {}

I am trying to read an existing parquet file and write the records into new parquet file but hitting above error. can someone help identify what I am doing wrong here? I have tried same with ReadRow and WriteRows where it works fine. My use case is, I would like to read records from existing parquet files, append new records to it and write back to parquet file (all merged records). Alternatively if there is a way to convert new records (not from parquet file) to []parquet.Row, that would work too. Please suggest.

Appreciate your help!

package main

import (
	"fmt"
	"os"
	"reflect"

	"github.com/parquet-go/parquet-go"
)

func main() {
	file1 := "file1.parquet"
	file2 := "file2.parquet"
	f1, err := os.Open(file1)
	if err != nil {
		panic(err)
	}
	defer f1.Close()

	s1, err := os.Stat(file1)
	if err != nil {
		panic(err)
	}

	p1, err := parquet.OpenFile(f1, s1.Size())
	if err != nil {
		panic(err)
	}
	if size := p1.Size(); size != s1.Size() {
		fmt.Errorf("file size mismatch: want=%d got=%d", s1.Size(), size)
		panic(err)
	}
	schema := p1.Schema()
	fmt.Println("file1 schema: ", schema)
	fmt.Println("file1 gotype", schema.GoType())

	reader := parquet.NewGenericReader[any](f1, &parquet.ReaderConfig{
		Schema: schema,
	})
	numRows := reader.NumRows()
	out := make([]any, numRows)
	n, err := reader.Read(out)
	if err != nil {
		panic(err)
	}
	fmt.Println("file1 out= ", n)
	fmt.Println("file1 out= ", reflect.TypeOf(out[0]))

	f2, err := os.Create(file2)
	if err != nil {
		panic(err)
	}
	defer f2.Close()

	pw := parquet.NewGenericWriter[any](f2, &parquet.WriterConfig{
		Compression: &parquet.Zstd,
		Schema:      schema,
	})
	written, err := pw.Write(out)
	if err != nil {
		panic(err)
	}
	fmt.Println("written:", written)
	if err := pw.Close(); err != nil {
		panic(err)
	}
	fmt.Println("file2 written successfully")
}

output:

{14:53}~/src/duckdb-test ➭ go run read-write.go
file1 schema:  message duckdb_schema {
        optional binary field1 (STRING);
        optional double field2;
}
file1 gotype struct { Field1 *[]uint8; Field2 *float64 }
file1 out=  264550
file1 out=  map[string]interface {}
panic: cannot create parquet value of type BYTE_ARRAY from go value of type interface {}

goroutine 1 [running]:
github.com/parquet-go/parquet-go.makeValue(0x6, 0x1400021b810, {0x104fe97a0?, 0x14001c182c0?, 0x140001cd968?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/value.go:339 +0xb4c
github.com/parquet-go/parquet-go.deconstructFuncOfLeaf.func1({0x140079a3b30, 0x2, 0x105688108?}, {0x20?, 0xb6?, 0xfd?}, {0x104fe97a0?, 0x14001c182c0?, 0x0?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/row.go:538 +0x74
github.com/parquet-go/parquet-go.deconstructFuncOfOptional.func1({0x140079a3b30?, 0x2?, 0x2?}, {0x40?, 0xc2?, 0x1a?}, {0x104fe97a0?, 0x14001c182c0?, 0x0?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/row.go:439 +0xdc
github.com/parquet-go/parquet-go.deconstructFuncOfGroup.func1({0x140079a3b30, 0x2, 0x2}, {0x54?, 0xe1?, 0xec?}, {0x104ff1240?, 0x140016231a0?, 0x14001c18201?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/row.go:515 +0x134
github.com/parquet-go/parquet-go.(*Schema).deconstructValueToColumns(0x140001ac240, {0x140079a3b30, 0x2, 0x2}, {0x104ff1240?, 0x140016231a0?, 0x1053377e0?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/schema.go:236 +0x128
github.com/parquet-go/parquet-go.(*Schema).Deconstruct(0x140001ac240, {0x0, 0x0, 0x0}, {0x104ff1240?, 0x140016231a0})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/schema.go:224 +0x1d8
github.com/parquet-go/parquet-go.(*Writer).Write(0x140000802a0, {0x104ff1240, 0x140016231a0})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:379 +0x1c4
github.com/parquet-go/parquet-go.(*GenericWriter[...]).writeAny(0x140000802a0?, {0x14000280000, 0x40, 0x140001cdcd8})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:233 +0x70
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write.func1(0x1050123e0?)
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:169 +0x60
github.com/parquet-go/parquet-go.(*writer).writeRows(0x140079b2000, 0x40966, 0x140001cdd58)
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:993 +0xb4
github.com/parquet-go/parquet-go.(*GenericWriter[...]).Write(0x105057520?, {0x14000280000?, 0x140001cde58?, 0x2?})
        /Users/anupamalolage/src/github.com/pkg/mod/github.com/parquet-go/[email protected]/writer.go:168 +0x58
main.main()
        src/duckdb-test/read-write.go:59 +0x478
exit status 2

GenericWriter should write map keys to matching columns

Given a parquet file with a schema generated from the following model:

type Inner struct {
	FieldB int
	FieldC string
}

type Model struct {
	FieldA string
	Nested Inner  
}

The GenericWriter should be able to write data from an an alternative model, where Nested is represented as map, as long as the map keys match column names from the original schema:

type AltModel struct {
	FieldA string
	Nested map[string]any
}

data := []AltModel{
	{
		FieldA: "a",
		Nested: map[string]any{"FieldB": 11, "FieldC": "c"},
	},
}

schema := parquet.SchemaOf(new(Model))
w := parquet.NewGenericWriter[AltModel](f, schema)
w.Write(data)

In its current implementation GenericWriter panics for the above code. Here is a gist that reproduces the error

This feature is useful when writing data to a schema where parts of the schema (i.e. the struct Inner) are defined dynamically at runtime.

Panic in page.go on bigendian arch

Hi, we are seeing the following error on IBM Z 64bit with Grafana Tempo and Parquet format. Weird is that the exact same setup works well on other architectures.

I have tried rebuilding Tempo with -tags purego and got the same panic.

Any help would be appreciated.

panic: runtime error: index out of range [7] with length 7

goroutine 1221 [running]:
github.com/segmentio/parquet-go.(*byteArrayPage).index(...)
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/page.go:982
github.com/segmentio/parquet-go.(*byteArrayDictionary).lookupString(0xc0002201c0, {0xc001108000, 0x1b, 0x400}, {{0xc001068000, 0x1b, 0x18}})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/dictionary_purego.go:42 +0x162
github.com/segmentio/parquet-go.(*byteArrayDictionary).Lookup(0xc0002201c0, {0xc001108000, 0x1b, 0x400}, {0xc001068000, 0x1b, 0x3e8})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/dictionary.go:748 +0x142
github.com/segmentio/parquet-go.(*indexedPageValues).ReadValues(0xc000fe5490, {0xc001068000, 0x1b, 0x3e8})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/dictionary.go:1332 +0xd6
github.com/segmentio/parquet-go.(*repeatedPageValues).ReadValues(0xc000aa3e40, {0xc001068000, 0x3e8, 0x3e8})
	/remote-source/tempo/app/vendor/github.com/segmentio/parquet-go/page_values.go:98 +0x192
github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).iterate.func3.2(0xc00115ff48, 0xc00115fec8, 0xc00018f740, {0xc001068000, 0x3e8, 0x3e8}, 0x3e8, {0x2815978, 0xc000ef6fc0}, {0x2829640, ...})
	/remote-source/tempo/app/pkg/parquetquery/iters.go:406 +0x26c
github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).iterate.func3(0xc00018f740, 0xc00115ff48, 0xc00115fec8, {0xc001068000, 0x3e8, 0x3e8}, 0x3e8, {0x2815978, 0xc000ef6fc0}, {0x2821868, ...})
	/remote-source/tempo/app/pkg/parquetquery/iters.go:459 +0x1e4
github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).iterate(0xc00018f740, {0x2815978, 0xc000ef6fc0}, 0x3e8)
	/remote-source/tempo/app/pkg/parquetquery/iters.go:464 +0x676
github.com/grafana/tempo/pkg/parquetquery.NewColumnIterator.func1()
	/remote-source/tempo/app/pkg/parquetquery/iters.go:299 +0x42
created by github.com/grafana/tempo/pkg/parquetquery.(*ColumnIterator).next
	/remote-source/tempo/app/pkg/parquetquery/iters.go:485 +0x60

Sorting writer is not able to create multiple row groups

I am trying to control number of rows written per row group in an output file but it does not seem to work. Input file has >570000 records. Not sure if I am missing anything in below code. Can someone help please?

package main

import (
	"fmt"
	"os"
	"reflect"

	"github.com/parquet-go/parquet-go"
)

func main() {
	file1 := "large_file.parquet"
	file2 := "large_file_control_row_group.parquet"
	f1, err := os.Open(file1)
	if err != nil {
		panic(err)
	}
	defer f1.Close()

	s1, err := os.Stat(file1)
	if err != nil {
		panic(err)
	}

	p1, err := parquet.OpenFile(f1, s1.Size())
	if err != nil {
		panic(err)
	}
	if size := p1.Size(); size != s1.Size() {
		fmt.Errorf("file size mismatch: want=%d got=%d", s1.Size(), size)
		panic(err)
	}
	schema := p1.Schema()
	fmt.Println("file1 schema: ", schema)
	fmt.Println("file1 gotype", schema.GoType())

	reader := parquet.NewGenericReader[any](f1, &parquet.ReaderConfig{
		Schema: schema,
	})
	numRows := reader.NumRows()
	out := make([]parquet.Row, 0)
	for _, rowgroup := range p1.RowGroups() {
		out1 := make([]parquet.Row, rowgroup.NumRows())
		_, err := reader.ReadRows(out1)
		if err != nil {
			panic(err)
		}
		out = append(out, out1...)
	}
	fmt.Println("file1 numRows", numRows)
	fmt.Println("file1 out= ", len(out))
	fmt.Println("file1 out= ", reflect.TypeOf(out[0]))

	f2, err := os.Create(file2)
	if err != nil {
		panic(err)
	}
	defer f2.Close()

	pw := parquet.NewSortingWriter[any](f2, int64(50000),
		&parquet.WriterConfig{
			//	PageBufferSize: 2560,
			Schema:      schema,
			Compression: &parquet.Zstd,
		})
	n, err := pw.WriteRows(out)
	if err != nil {
		panic( fmt.Sprintf("failed to write parquet file  err: ", file2, err))
	}
	fmt.Println("written rows to parquet file : ", n, file2)
	if err := pw.Close(); err != nil {
		panic( fmt.Sprintf("failed to close parquet file, err:", file2, err))
	}
	fmt.Println("closed parquet file", file2)
}

Allow opening a single RowGroup

The OpenFile function parses the page index of all row groups, which for large files (100GB+) can already take a couple of seconds. It would be great to have an API which allows the user to open a set of row groups instead opening all of them.

Support writing 8-bit and 16-bit integers

Attempting to serialize signed or unsigned 8-bit or 16-bit results in: "panic: cannot convert Go values of type to parquet value". Having the ability to serialize smaller integers into parquet (presumably as physical int32 but logical int(bit-width, sign) would help enforce logical bounds on numbers and to reduce the in-memory footprint of values to be written (and read, if the same struct is reused for reading).

I'm happy to do most of the legwork on the implementation if needed, though I may need a bit of direction.

Add documentation about single row writes and compression

I was using the GenericWriter and writing one row at a time as I read in the data, thinking that compression would happen after calling close. What actually happened was that when writing the same html data 1,000 times, one row at a time, ended with a 192 MB file. Writing all the rows at once resulted in a 7 MB file, while also using 70% less RAM.

I'm not saying that this is a bug, but it would be helpful to have in the documentation. Maybe using GenericBuffer would have worked. I don't feel like I understand well enough what happened to add it to the readme. It feels like compression is applied for each Write call separately, but the output file said I had 1 row group, so I thought meant the full row group was compressed together.

Thanks for a great library.

Looking for contributors

While Segment has invested a lot of resources in parquet-go, it is not a direction that we have the time or ability to maintain.

We are looking for contributors or maintainers who can help keep the library up to date. If you are interested or are using parquet-go in production, please reach out either here or via email at [email protected].

panic: reflect: call of reflect.Value.Field on zero Value

I'm trying to read records into a struct that contains a slice of elements with nested struct-pointers and seeing the following panic:

panic: reflect: call of reflect.Value.Field on zero Value

Below a test I wrote to reproduce the issue. It writes a single record to a parquet file, then tries to read it.

func TestNestedPointer(t *testing.T) {
	type InnerStruct struct {
		InnerField string
	}

	type SliceElement struct {
		Inner *InnerStruct
	}

	type Outer struct {
		Slice []*SliceElement
	}

	in := &Outer{
		Slice: []*SliceElement{
			{
				Inner: &InnerStruct{
					InnerField: "inner-string",
				},
			},
		},
	}

	tmpDir := t.TempDir()
	parquetFile := filepath.Join(tmpDir, "data.parquet")
	f, err := os.Create(parquetFile)
	if err != nil {
		t.Fatal(err)
	}

	pw := parquet.NewGenericWriter[*Outer](f)
	_, err = pw.Write([]*Outer{in})
	if err != nil {
		t.Fatal(err)
	}

	err = pw.Close()
	if err != nil {
		t.Fatal(err)
	}

	err = f.Close()
	if err != nil {
		t.Fatal(err)
	}

	f, err = os.Open(parquetFile)
	if err != nil {
		t.Fatal(err)
	}
	defer f.Close()

	pr := parquet.NewGenericReader[*Outer](f)

	out := make([]*Outer, 1)
	_, err = pr.Read(out)
	if err != nil {
		t.Fatal(err)
	}

	pr.Close()
}

The code above works as expected if InnerStruct is not a pointer in the slice element.

	type SliceElement struct {
		Inner InnerStruct
	}

Any suggestions on how to fix this? I can't remove the pointers since I'm using generated protobuf messages which always use pointers. The pointers don't appear to be a problem in other places, only if used in a struct that's part of a slice from what I can tell. Any help with this is appreciated.

Prepare for v0.20

I have created a milestone to track things going into release v0.20 here https://github.com/parquet-go/parquet-go/milestone/1

This ticket is for gathering feedback in case there is a ticket or PR that I missed and someone would like to add . So far only sorting writer fix Me and @fpetkovski are looking into is pending.

@asubiotto regarding #96 is it urgent/important to you? I can hunt down and help address the use after free bug that was corrupting your data. I'm always into perf stuff. Just let me know if you want it to be part of v0.20

@fpetkovski @achille-roussel What do you think ?

Sorting for nested structure column

Hey, i have a problem with sorting parquet file by nested column.
For example look at these structs

type Data struct {
ID string
Time int64
Value Value
}

type Value struct {
Freq float64
Power float64
}

The string of column name looks like "Value.Freq"

When i try to use
parquet.Ascending("Value.Freq")
It doesnt work.

Is there any other way to sort parquet file by nested value?

SortingWriter writes corrupted data for String and FixedLenByteArray

The SortingWriter writes corrupted data for String and FixedLenByteArray types for any meaningfully large number of Rows. I think this has been present since the introduction of the SortingWriter. In the new tests in the sortingwriter-corruption branch (PR #24), sometimes the mismatched row is found at another index in the array: we found the row at index 106 in want and sometimes it's just corrupted non-row data: got row index 43 isn't found in want rows, and is therefore corrupted data.

=== RUN   TestSortingWriterCorruptedString
    sorting_test.go:239: rows mismatch at index 42 :
    sorting_test.go:240:  want: parquet_test.Row{Tag:"NOjZhG1MpXf4naQFRqE25pCr4EszfVExuT9BGf4znMjAl82X081NXl51t7hYFh1ESB9HGrgtGns949cECbwr0WFcOQ7hQii7s418"}
    sorting_test.go:241:   got: parquet_test.Row{Tag:"zipEs765yzHuaW9s7YAXu23ORm8DgLpmlJeZLy7l4z5yBd4AqtXLfgnjiOarORSiywx8yuzgZwJBmwLu0XIjB1IOmkkMJdtgE91L"}
    sorting_test.go:247:   we found the row at index 106 in want.
    sorting_test.go:239: rows mismatch at index 43 :
    sorting_test.go:240:  want: parquet_test.Row{Tag:"ONxgPm4crYY7e5So1q5PJBZmoP6edxQHM5Qcb9iPmPBb1EPejC2gbNTCw3VYO1v4tDxey9OF4Dya6VdeuAVkyNG8xSmIXeLnHWDs"}
    sorting_test.go:241:   got: parquet_test.Row{Tag:"lJeZLy7l4z5yBd4AqtXLfgnjiOarORSiywx8yuzgZwJBmwLu0XIjB1IOmkkMJdtgE91Ly9OF4Dya6VdeuAVkyNG8xSmIXeLnHWDs"}
    sorting_test.go:253:   got row index 43 isn't found in want rows, and is therefore corrupted data.
    sorting_test.go:259: 2 rows mismatched out of 107 total
--- FAIL: TestSortingWriterCorruptedString (0.12s)

=== RUN   TestSortingWriterCorruptedFixedLenByteArray
    sorting_test.go:239: rows mismatch at index 168 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x63, 0x76, 0x6c, 0x6a, 0x70, 0x4d, 0x51, 0x76, 0x79, 0x71, 0x6b, 0x73, 0x32, 0x6c, 0x4d}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x53, 0x75, 0x7a, 0x77, 0x6b, 0x58, 0x44, 0x44, 0x76, 0x77, 0x32, 0x4d, 0x67, 0x4a, 0x77, 0x4a}}
    sorting_test.go:247:   we found the row at index 360 in want.
    sorting_test.go:239: rows mismatch at index 169 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x6c, 0x6b, 0x70, 0x77, 0x32, 0x56, 0x38, 0x6c, 0x33, 0x4e, 0x6f, 0x55, 0x50, 0x70, 0x53}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x31, 0x46, 0x6a, 0x35, 0x66, 0x59, 0x35, 0x56, 0x79, 0x58, 0x48, 0x45, 0x33, 0x6c, 0x41}}
    sorting_test.go:247:   we found the row at index 361 in want.
    sorting_test.go:239: rows mismatch at index 170 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x70, 0x53, 0x78, 0x35, 0x50, 0x4b, 0x34, 0x4f, 0x64, 0x6d, 0x36, 0x6f, 0x74, 0x30, 0x77}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x32, 0x6c, 0x66, 0x61, 0x42, 0x74, 0x37, 0x56, 0x71, 0x47, 0x74, 0x55, 0x42, 0x37, 0x70}}
    sorting_test.go:247:   we found the row at index 362 in want.
    sorting_test.go:239: rows mismatch at index 171 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x43, 0x7a, 0x64, 0x68, 0x70, 0x38, 0x47, 0x5a, 0x47, 0x58, 0x6d, 0x4e, 0x68, 0x43, 0x4c, 0x52}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x46, 0x4d, 0x55, 0x50, 0x5a, 0x4c, 0x6d, 0x62, 0x36, 0x6c, 0x57, 0x52, 0x7a, 0x6d, 0x4c}}
    sorting_test.go:247:   we found the row at index 363 in want.
    sorting_test.go:239: rows mismatch at index 172 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x39, 0x4e, 0x6d, 0x79, 0x36, 0x47, 0x71, 0x6a, 0x76, 0x6b, 0x32, 0x4b, 0x6e, 0x30, 0x78}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x4a, 0x47, 0x49, 0x72, 0x4b, 0x6d, 0x4e, 0x48, 0x50, 0x57, 0x63, 0x4b, 0x31, 0x78, 0x77}}
    sorting_test.go:247:   we found the row at index 364 in want.
    sorting_test.go:239: rows mismatch at index 173 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x43, 0x79, 0x52, 0x46, 0x4b, 0x6c, 0x34, 0x5a, 0x79, 0x61, 0x71, 0x50, 0x6d, 0x6e, 0x34}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x4b, 0x42, 0x47, 0x78, 0x6e, 0x64, 0x69, 0x49, 0x31, 0x38, 0x4d, 0x77, 0x67, 0x4c, 0x76}}
    sorting_test.go:247:   we found the row at index 365 in want.
    sorting_test.go:239: rows mismatch at index 174 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x4b, 0x4d, 0x6f, 0x37, 0x4e, 0x79, 0x43, 0x43, 0x44, 0x65, 0x67, 0x62, 0x43, 0x30, 0x4c}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x4e, 0x33, 0x51, 0x48, 0x5a, 0x32, 0x73, 0x41, 0x47, 0x68, 0x79, 0x6e, 0x72, 0x63, 0x32}}
    sorting_test.go:247:   we found the row at index 366 in want.
    sorting_test.go:239: rows mismatch at index 175 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x53, 0x6c, 0x51, 0x65, 0x65, 0x79, 0x39, 0x52, 0x59, 0x59, 0x75, 0x4c, 0x32, 0x63, 0x50}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x51, 0x7a, 0x71, 0x7a, 0x64, 0x4f, 0x4a, 0x5a, 0x37, 0x38, 0x51, 0x43, 0x45, 0x43, 0x36}}
    sorting_test.go:247:   we found the row at index 367 in want.
    sorting_test.go:239: rows mismatch at index 176 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x5a, 0x4d, 0x38, 0x51, 0x75, 0x55, 0x58, 0x6a, 0x4d, 0x79, 0x58, 0x71, 0x67, 0x77, 0x4a}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x56, 0x39, 0x44, 0x63, 0x35, 0x71, 0x79, 0x6b, 0x4e, 0x70, 0x36, 0x54, 0x4a, 0x64, 0x48}}
    sorting_test.go:247:   we found the row at index 368 in want.
    sorting_test.go:239: rows mismatch at index 177 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x66, 0x31, 0x53, 0x4d, 0x50, 0x39, 0x51, 0x50, 0x57, 0x35, 0x64, 0x4b, 0x4f, 0x47, 0x69}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x61, 0x67, 0x7a, 0x41, 0x49, 0x33, 0x41, 0x56, 0x73, 0x45, 0x6c, 0x41, 0x73, 0x36, 0x69}}
    sorting_test.go:247:   we found the row at index 369 in want.
    sorting_test.go:239: rows mismatch at index 178 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x66, 0x41, 0x75, 0x69, 0x48, 0x76, 0x77, 0x56, 0x69, 0x52, 0x43, 0x56, 0x56, 0x77, 0x45}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x62, 0x64, 0x35, 0x5a, 0x5a, 0x65, 0x72, 0x73, 0x64, 0x44, 0x30, 0x58, 0x6d, 0x31, 0x35}}
    sorting_test.go:247:   we found the row at index 370 in want.
    sorting_test.go:239: rows mismatch at index 179 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x6b, 0x30, 0x47, 0x47, 0x49, 0x5a, 0x57, 0x7a, 0x73, 0x7a, 0x5a, 0x34, 0x57, 0x72, 0x47}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x6d, 0x6c, 0x69, 0x55, 0x70, 0x49, 0x30, 0x74, 0x6a, 0x50, 0x64, 0x47, 0x6a, 0x4a, 0x35}}
    sorting_test.go:247:   we found the row at index 371 in want.
    sorting_test.go:239: rows mismatch at index 180 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x6d, 0x4b, 0x36, 0x43, 0x77, 0x41, 0x6e, 0x6f, 0x6f, 0x4e, 0x32, 0x51, 0x4a, 0x38, 0x71}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x54, 0x6e, 0x38, 0x35, 0x6c, 0x44, 0x6b, 0x68, 0x62, 0x67, 0x6a, 0x70, 0x69, 0x6a, 0x37, 0x61}}
    sorting_test.go:247:   we found the row at index 372 in want.
    sorting_test.go:239: rows mismatch at index 181 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x74, 0x39, 0x59, 0x74, 0x6b, 0x41, 0x70, 0x4e, 0x6f, 0x36, 0x33, 0x57, 0x72, 0x69, 0x4f}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x33, 0x58, 0x45, 0x6e, 0x43, 0x57, 0x77, 0x51, 0x30, 0x4f, 0x52, 0x48, 0x42, 0x70, 0x51}}
    sorting_test.go:247:   we found the row at index 373 in want.
    sorting_test.go:239: rows mismatch at index 182 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x76, 0x47, 0x4f, 0x54, 0x66, 0x62, 0x58, 0x4f, 0x79, 0x53, 0x65, 0x63, 0x47, 0x56, 0x62}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x34, 0x57, 0x71, 0x77, 0x41, 0x67, 0x33, 0x38, 0x4e, 0x50, 0x70, 0x61, 0x75, 0x6e, 0x52}}
    sorting_test.go:247:   we found the row at index 374 in want.
    sorting_test.go:239: rows mismatch at index 183 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x44, 0x78, 0x6f, 0x39, 0x71, 0x74, 0x36, 0x57, 0x37, 0x48, 0x6e, 0x49, 0x64, 0x62, 0x71, 0x74}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x38, 0x71, 0x6d, 0x76, 0x77, 0x33, 0x36, 0x69, 0x6d, 0x38, 0x70, 0x45, 0x44, 0x4c, 0x48}}
    sorting_test.go:247:   we found the row at index 375 in want.
    sorting_test.go:239: rows mismatch at index 184 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x38, 0x55, 0x4a, 0x71, 0x69, 0x70, 0x63, 0x58, 0x54, 0x46, 0x52, 0x4b, 0x73, 0x39, 0x65}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x4a, 0x44, 0x6e, 0x58, 0x38, 0x4a, 0x58, 0x59, 0x38, 0x61, 0x43, 0x6a, 0x50, 0x63, 0x4b}}
    sorting_test.go:247:   we found the row at index 376 in want.
    sorting_test.go:239: rows mismatch at index 185 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x44, 0x4a, 0x30, 0x36, 0x4e, 0x70, 0x46, 0x4c, 0x67, 0x68, 0x66, 0x4e, 0x62, 0x66, 0x4b}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x52, 0x69, 0x66, 0x43, 0x68, 0x52, 0x72, 0x39, 0x53, 0x42, 0x67, 0x7a, 0x4f, 0x63, 0x56}}
    sorting_test.go:247:   we found the row at index 377 in want.
    sorting_test.go:239: rows mismatch at index 186 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x4c, 0x78, 0x4f, 0x54, 0x57, 0x4b, 0x59, 0x73, 0x44, 0x57, 0x63, 0x68, 0x34, 0x34, 0x32}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x57, 0x36, 0x76, 0x37, 0x76, 0x4b, 0x6e, 0x69, 0x68, 0x54, 0x46, 0x48, 0x59, 0x41, 0x73}}
    sorting_test.go:247:   we found the row at index 378 in want.
    sorting_test.go:239: rows mismatch at index 187 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x4d, 0x6b, 0x47, 0x4b, 0x6a, 0x69, 0x30, 0x58, 0x68, 0x38, 0x6e, 0x6c, 0x6e, 0x79, 0x42}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x58, 0x65, 0x4f, 0x56, 0x44, 0x6f, 0x72, 0x32, 0x78, 0x70, 0x65, 0x57, 0x56, 0x49, 0x51}}
    sorting_test.go:247:   we found the row at index 379 in want.
    sorting_test.go:239: rows mismatch at index 188 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x4e, 0x54, 0x71, 0x63, 0x37, 0x4e, 0x78, 0x47, 0x76, 0x63, 0x54, 0x5a, 0x59, 0x49, 0x64}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x64, 0x5a, 0x7a, 0x55, 0x6b, 0x55, 0x33, 0x41, 0x4b, 0x43, 0x33, 0x4c, 0x34, 0x6c, 0x4b}}
    sorting_test.go:247:   we found the row at index 380 in want.
    sorting_test.go:239: rows mismatch at index 189 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x70, 0x6f, 0x69, 0x4f, 0x7a, 0x34, 0x77, 0x47, 0x4e, 0x68, 0x63, 0x78, 0x56, 0x72, 0x70}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x65, 0x31, 0x4b, 0x69, 0x36, 0x55, 0x4d, 0x6e, 0x51, 0x75, 0x53, 0x53, 0x79, 0x44, 0x37}}
    sorting_test.go:247:   we found the row at index 381 in want.
    sorting_test.go:239: rows mismatch at index 190 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x71, 0x6c, 0x77, 0x67, 0x56, 0x32, 0x73, 0x42, 0x34, 0x62, 0x43, 0x44, 0x4c, 0x30, 0x51}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x55, 0x72, 0x58, 0x32, 0x56, 0x73, 0x63, 0x71, 0x49, 0x37, 0x69, 0x48, 0x53, 0x33, 0x72, 0x64}}
    sorting_test.go:247:   we found the row at index 382 in want.
    sorting_test.go:239: rows mismatch at index 191 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x45, 0x74, 0x54, 0x4e, 0x63, 0x4e, 0x78, 0x54, 0x4b, 0x7a, 0x38, 0x6c, 0x77, 0x65, 0x57, 0x6e}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x56, 0x32, 0x62, 0x54, 0x39, 0x72, 0x76, 0x73, 0x62, 0x45, 0x63, 0x72, 0x57, 0x48, 0x5a, 0x36}}
    sorting_test.go:247:   we found the row at index 383 in want.
    sorting_test.go:239: rows mismatch at index 378 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x57, 0x36, 0x76, 0x37, 0x76, 0x4b, 0x6e, 0x69, 0x68, 0x54, 0x46, 0x48, 0x59, 0x41, 0x73}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x51, 0x74, 0x55, 0x37, 0x54, 0x48, 0x47, 0x69, 0x74, 0x67, 0x57, 0x4d, 0x7a, 0x31, 0x37}}
    sorting_test.go:247:   we found the row at index 570 in want.
    sorting_test.go:239: rows mismatch at index 379 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x58, 0x65, 0x4f, 0x56, 0x44, 0x6f, 0x72, 0x32, 0x78, 0x70, 0x65, 0x57, 0x56, 0x49, 0x51}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x53, 0x32, 0x59, 0x61, 0x5a, 0x67, 0x6c, 0x53, 0x72, 0x50, 0x57, 0x37, 0x45, 0x67, 0x79}}
    sorting_test.go:247:   we found the row at index 571 in want.
    sorting_test.go:239: rows mismatch at index 380 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x64, 0x5a, 0x7a, 0x55, 0x6b, 0x55, 0x33, 0x41, 0x4b, 0x43, 0x33, 0x4c, 0x34, 0x6c, 0x4b}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x5a, 0x73, 0x4a, 0x75, 0x41, 0x38, 0x70, 0x49, 0x6b, 0x4f, 0x47, 0x7a, 0x67, 0x77, 0x65}}
    sorting_test.go:247:   we found the row at index 572 in want.
    sorting_test.go:239: rows mismatch at index 381 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x65, 0x31, 0x4b, 0x69, 0x36, 0x55, 0x4d, 0x6e, 0x51, 0x75, 0x53, 0x53, 0x79, 0x44, 0x37}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x64, 0x39, 0x61, 0x45, 0x72, 0x44, 0x42, 0x49, 0x4f, 0x6a, 0x55, 0x70, 0x77, 0x6b, 0x43}}
    sorting_test.go:247:   we found the row at index 573 in want.
    sorting_test.go:239: rows mismatch at index 382 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x55, 0x72, 0x58, 0x32, 0x56, 0x73, 0x63, 0x71, 0x49, 0x37, 0x69, 0x48, 0x53, 0x33, 0x72, 0x64}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x6f, 0x43, 0x74, 0x49, 0x32, 0x6f, 0x4c, 0x78, 0x48, 0x7a, 0x71, 0x61, 0x41, 0x41, 0x50}}
    sorting_test.go:247:   we found the row at index 574 in want.
    sorting_test.go:239: rows mismatch at index 383 :
    sorting_test.go:240:  want: parquet_test.Row{ID:[16]uint8{0x56, 0x32, 0x62, 0x54, 0x39, 0x72, 0x76, 0x73, 0x62, 0x45, 0x63, 0x72, 0x57, 0x48, 0x5a, 0x36}}
    sorting_test.go:241:   got: parquet_test.Row{ID:[16]uint8{0x6d, 0x70, 0x36, 0x48, 0x47, 0x56, 0x51, 0x6d, 0x4c, 0x43, 0x55, 0x41, 0x49, 0x35, 0x33, 0x77}}
    sorting_test.go:247:   we found the row at index 575 in want.
    sorting_test.go:259: 30 rows mismatched out of 700 total
--- FAIL: TestSortingWriterCorruptedFixedLenByteArray (0.00s)

Data corruption in page with CRC32 checksum of value 0

A data page with a CRC32 checksum of value 0x0000 seems to be triggering a false-positive corruption check. The error is ErrCorrupted, which is thrown when reading the file page.

The data page where this has happened belongs to a string column with a dictionary (see definition). While trying to debug this error, we've noticed that the CRC32 checksum from the data page outputs 0. While that is a valid value in CRC32, further debugging showed that the CRC stored in the header of the page is the same as in the dictionary page of that column, which seems very suspicious.

Disabling the checksum verification allows to read the page's value normally, which suggests that there may not be actual corruption of the data.

I wasn't able to get much further than this. Is it possible that there is a check where if CRC is equal to 0, a different value is used? I've tried a couple of things such as manually disabling caching of PageHeaders and manually disabling caching of the encoders, in case an object used during read had data from previous uses, but didn't find anything.

LIST and MAP types not properly preserved in read after write

See the following code that writes, then reads, a parquet file. The schema doesn't seem to preserve the LIST and MAP types:

package main

import (
	"github.com/parquet-go/parquet-go"
	"log"
	"os"
)

func main() {
	fileName := "schema-issue.parquet"

	var root parquet.Node = parquet.Group{
		"aaa": parquet.Uint(32),
		"bbb": parquet.List(
			parquet.List(
				parquet.String(),
			),
		),
		"ccc": parquet.Group{
			"ddd": parquet.Leaf(parquet.ByteArrayType),
			"eee": parquet.Leaf(parquet.BooleanType),
		},
		"fff": parquet.Map(
			parquet.Int(32),
			parquet.Group{
				"ggg": parquet.Date(),
			},
		),
	}
	writtenSchema := parquet.NewSchema("mySchema", root)

	writtenFile, _ := os.Create(fileName)
	writer := parquet.NewGenericWriter[any](writtenFile, writtenSchema)
	writer.Close()

	openedFile, _ := os.Open(fileName)
	reader := parquet.NewReader(openedFile)

	log.Printf("Written schema: %s", writtenSchema.String())
	log.Printf("Opened schema: %s", reader.Schema().String())
}

Here's the output:

Written schema: message mySchema {
        required int32 aaa (INT(32,false));
        required group bbb (LIST) {
                repeated group list {
                        required group element (LIST) {
                                repeated group list {
                                        required binary element (STRING);
                                }
                        }
                }
        }
        required group ccc {
                required binary ddd;
                required boolean eee;
        }
        required group fff (MAP) {
                repeated group key_value {
                        required int32 key (INT(32,true));
                        required group value {
                                required int32 ggg (DATE);
                        }
                }
        }
}

Opened schema: message mySchema {
        required int32 aaa (INT(32,false));
        required group bbb {
                repeated group list {
                        required group element {
                                repeated group list {
                                        required binary element (STRING);
                                }
                        }
                }
        }
        required group ccc {
                required binary ddd;
                required boolean eee;
        }
        required group fff {
                repeated group key_value {
                        required int32 key (INT(32,true));
                        required group value {
                                required int32 ggg (DATE);
                        }
                }
        }
}

See how the LIST and MAP types are missing in the read schema:
2023-09-01_13-42-01

Is that a bug or am I maybe missing something?

ARM64 runners

Previously we were sharing runners with other internal Segment projects. At the moment we don't have a way to run ARM64 tests in a CI environment.

parquet-go can read nested objects, but not parquet cli (Parquet/Avro schema mismatch)

Hello,
Copying the issue from the previous repo (segmentio#483), just tested against 0.19 and the issue is still present.

I'm seeing a weird situation which I can't explain. I've created a parquet file in go, I can read it in go, I can read the schema from the cli, but I get an error when I cat it.

Note: I've never used the parquet command before, could be a user error too, I'm using the brew install parquet-cli
Note2: If I remove the nested structure it works fine

code:

package main

import (
	"bytes"
	"flag"
	"os"
	"time"

	"github.com/davecgh/go-spew/spew"
	"github.com/golang/glog"
	"github.com/parquet-go/parquet-go"
)

type ParquetOtherElements struct {
	ElementId   string   `parquet:"elementId"`
	Type        string   `parquet:"type,enum"`
	RehomedFrom []string `parquet:"rehomedFrom,list"`
}

type ParquetElement struct {
	ElementId string `parquet:"elementId"`
	Type      string `parquet:"type,enum"`
	Label     string `parquet:"label"`
	Status    string `parquet:"status,enum"`
	Element   string `parquet:"element,json"`
}

type ParquetStruct struct {
	TenantId             string                 `parquet:"tenantId"`
	TriggerProviderId    string                 `parquet:"triggerProviderId"`
	TriggerEventId       string                 `parquet:"triggerEventId"`
	TriggerProcessorType string                 `parquet:"triggerProcessorType"`
	EventTime            int64                  `parquet:"eventTime,timestamp(nanosecond)"`
	EventFlagged         bool                   `parquet:"eventFlagged"`
	Nodes                []ParquetElement       `parquet:"nodes,list"`
	Edges                []ParquetElement       `parquet:"edges,list"`
	OtherElements        []ParquetOtherElements `parquet:"otherElements,optional,list"`
}

func main() {
	flag.Set("alsologtostderr", "true")
	flag.Set("v", "3")
	flag.Parse()

	content := new(bytes.Buffer)
	w := parquet.NewGenericWriter[ParquetStruct](content)
	p := []ParquetStruct{
		{
			TenantId:             "tenantIdV",
			TriggerProviderId:    "triggerProviderIdV",
			TriggerEventId:       "triggerEventIdV",
			TriggerProcessorType: "triggerProcessorTypeV",
			EventFlagged:         false,
			EventTime:            time.Now().UnixNano(),
			Nodes:                []ParquetElement{{ElementId: "elementIdV", Type: "typeV", Label: "labelV", Status: "statusV", Element: "{}"}},
		},
	}
	spew.Dump(p)
	if _, err := w.Write(p); err != nil {
		glog.Fatal(err)
	}
	if err := w.Close(); err != nil {
		glog.Fatal(err)
	}

	// write file from content buffer
	f, err := os.Create("myfile.parquet")
	if err != nil {
		glog.Fatal(err)
	}
	defer f.Close()
	if _, err := f.Write(content.Bytes()); err != nil {
		glog.Fatal(err)
	}

	glog.Infoln("reading buffer....")
	file := bytes.NewReader(content.Bytes())
	rows, err := parquet.Read[ParquetStruct](file, file.Size())
	if err != nil {
		glog.Fatal(err)
	}
	spew.Dump(rows)
}
 bob  ~/scratch/parquet  go run .                                                                                                                                                                                                                                                                                              
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) <nil>,
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
I0307 16:36:32.428573   62343 main.go:76] reading buffer....
([]main.ParquetStruct) (len=1 cap=1) {
 (main.ParquetStruct) {
  TenantId: (string) (len=9) "tenantIdV",
  TriggerProviderId: (string) (len=18) "triggerProviderIdV",
  TriggerEventId: (string) (len=15) "triggerEventIdV",
  TriggerProcessorType: (string) (len=21) "triggerProcessorTypeV",
  EventTime: (int64) 1678235792424865000,
  EventFlagged: (bool) false,
  Nodes: ([]main.ParquetElement) (len=1 cap=1) {
   (main.ParquetElement) {
    ElementId: (string) (len=10) "elementIdV",
    Type: (string) (len=5) "typeV",
    Label: (string) (len=6) "labelV",
    Status: (string) (len=7) "statusV",
    Element: (string) (len=2) "{}"
   }
  },
  Edges: ([]main.ParquetElement) {
  },
  OtherElements: ([]main.ParquetOtherElements) <nil>
 }
}
 bob  ~/scratch/parquet  parquet cat myfile.parquet                                                                                                                                                                                                                                                                           
Unknown error
java.lang.RuntimeException: Failed on record 0
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:86)
        at org.apache.parquet.cli.Main.run(Main.java:157)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
        at org.apache.parquet.cli.Main.main(Main.java:187)
Caused by: org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch: Avro field 'elementId' not found
        at org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:221)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:126)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:228)
        at org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:74)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:539)
        at org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:489)
        at org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:293)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:137)
        at org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:91)
        at org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
        at org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:142)
        at org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:190)
        at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:166)
        at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
        at org.apache.parquet.cli.BaseCommand$1$1.advance(BaseCommand.java:363)
        at org.apache.parquet.cli.BaseCommand$1$1.<init>(BaseCommand.java:344)
        at org.apache.parquet.cli.BaseCommand$1.iterator(BaseCommand.java:342)
        at org.apache.parquet.cli.commands.CatCommand.run(CatCommand.java:73)
        ... 3 more
 1  bob  ~/scratch/parquet  parquet schema myfile.parquet                                                                                                                                                                                                                                                                     
{
  "type" : "record",
  "name" : "ParquetStruct",
  "fields" : [ {
    "name" : "tenantId",
    "type" : "string"
  }, {
    "name" : "triggerProviderId",
    "type" : "string"
  }, {
    "name" : "triggerEventId",
    "type" : "string"
  }, {
    "name" : "triggerProcessorType",
    "type" : "string"
  }, {
    "name" : "eventTime",
    "type" : "long"
  }, {
    "name" : "eventFlagged",
    "type" : "boolean"
  }, {
    "name" : "nodes",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "edges",
    "type" : {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list2",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element2",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "label",
              "type" : "string"
            }, {
              "name" : "status",
              "type" : "string"
            }, {
              "name" : "element",
              "type" : "bytes"
            } ]
          }
        } ]
      }
    }
  }, {
    "name" : "otherElements",
    "type" : [ "null", {
      "type" : "array",
      "items" : {
        "type" : "record",
        "name" : "list",
        "namespace" : "list3",
        "fields" : [ {
          "name" : "element",
          "type" : {
            "type" : "record",
            "name" : "element",
            "namespace" : "element3",
            "fields" : [ {
              "name" : "elementId",
              "type" : "string"
            }, {
              "name" : "type",
              "type" : "string"
            }, {
              "name" : "rehomedFrom",
              "type" : {
                "type" : "array",
                "items" : {
                  "type" : "record",
                  "name" : "list",
                  "namespace" : "list4",
                  "fields" : [ {
                    "name" : "element",
                    "type" : "string"
                  } ]
                }
              }
            } ]
          }
        } ]
      }
    } ],
    "default" : null
  } ]
}
 bob  ~/scratch/parquet 

Support for protobuf messages

It'd be great if this library was able to support reading and writing protobuf messages. Protobufs are generated and there is no (clean) way to control field tags which would be necessary to set field names to something other than camel-case. There are however json: tags being added by default as in the example below:

type SomeMessage struct {
  ...
  UserAgent string                 `protobuf:"bytes,7,opt,name=user_agent,json=userAgent,proto3" json:"user_agent,omitempty"`
}

Perhaps it'd be possible to add a function similar to SchemaOf() that would generate a schema for a protobuf, with options to control field names.

List-type columns should able to write null in parquet file #513

Descriptions:

We have a program that read json file, do some operation, then write to a new parquet file. Some problems happened when there are list-type columns.

Originally, this list-type may have a null value in the json source(e.g.{"list":null}), we want the parquet file to remain this characteristic(remain null if the json source is null). However, we cannot achieve this using the current parquet struct tag this library provided.

package main

import (
	"log"

	"github.com/parquet-go/parquet-go"
)

type RowType1 struct {
	ListTag []int32 `json:"list_tag" parquet:"list_tag,list"`
}

func main() {
	var rows []RowType1
	strs := []string{
		`{}`,
		`{"list_tag":null}`,
		`{"list_tag":[]}`,
		`{"list_tag":[1,2]}`,
	}
	for _, s := range strs {
		var r RowType1
		json.Unmarshal([]byte(s), &r)
		rows = append(rows, r)
	}
	if err := parquet.WriteFile("file.parquet", rows); err != nil {
		log.Fatalln("error")
	}
}

//The result is printed by `pqrs cat file.parquet`
// {list_tag: []} -> expect {list_tag: null}
// {list_tag: []} -> expect {list_tag: null}
// {list_tag: []}
// {list_tag: [1, 2]}

I have tried the following methods:

  • to make the list-type columns optional
type RowType1 struct {
	ListTag []int32 `json:"list_tag" parquet:"list_tag,list,optional"`
}

However, it seems to be all rows will be either null or empty lists, no matter whether the origin json file has value (e.g.{"list":[1,2]}) or not.(Wonder if list + optional is not excepted, or it is just a bug)

  • make the list-type columns a pointer
type RowType1 struct {
	ListTag *[]int32 `json:"list_tag" parquet:"list_tag,list"`
}

// panic: list is an invalid parquet tag: list_tag *[]int32 [list]
However, it seems that the list tag currently only support Slice type

Expected Result:

  • have a way to write parquet with a null value for the list-type column

Is there any possibility to achieve it?

Unable to read nested map value with schema generated from NewSchema()

When reading parquet data that contains a map with a nested value and using a schema generated with parquet.NewSchema() I'm seeing a panic from the library. The panic happens during the read (writing is fine):

panic: reflect: call of reflect.Value.MapIndex on struct Value

The following test illustrates the issue.

func TestMapValue(t *testing.T) {
	// Main struct used to mashal/unmarshal data.
	type NestedStruct struct {
		Val string
	}
	type MapValue struct {
		Nested NestedStruct
	}
	type Message struct {
		TestMap map[string]MapValue
	}

	testKey, testValue := "test-key", "test-value"
	in := Message{
		TestMap: map[string]MapValue{
			"test-key": {
				Nested: NestedStruct{
					Val: testValue,
				},
			},
		},
	}

	var f bytes.Buffer

	// Generate a schema that matches Message exactly.
	schema := parquet.NewSchema("Message", parquet.Group{
		"TestMap": parquet.Group{
			"key_value": parquet.Repeated(parquet.Group{
				"key": parquet.String(),
				"value": parquet.Group{
					"Nested": parquet.Group{
						"Val": parquet.String(),
					},
				},
			}),
		},
	})

	pw := parquet.NewGenericWriter[Message](&f, schema)
	_, err := pw.Write([]Message{in})
	if err != nil {
		t.Fatal(err)
	}

	err = pw.Close()
	if err != nil {
		t.Fatal(err)
	}

	// The goal here is to not use a schema generated from Message with
	// parquet.SchemaOf(Message), but one made with parquet.NewSchema(...).
	pr := parquet.NewGenericReader[*Message](bytes.NewReader(f.Bytes()), schema)

	out := make([]*Message, 1)
	_, err = pr.Read(out)
	if err != nil {
		t.Fatal(err)
	}
	pr.Close()
	if want, got := testValue, out[0].TestMap[testKey].Nested.Val; want != got {
		t.Error("failed to read map value")
	}
}

Is it possible to write the schema in such a way that the test above passes? In my case I can not use parquet.SchemaOf(Message) as the Message type is not available/usable for this purpose.

Test failure

Attempting to get the tests passing to merge a rewrite references change, I get the following error:

FAIL: TestOpenFile (2.01s)
    --- FAIL: TestOpenFile/testdata/issue368.parquet (0.00s)
        file_test.go:38: reading parquet file metadata: decoding thrift payload: -10:FIELD<?>: skipping unsupported thrift type 15
FAIL
FAIL	github.com/parquet-go/parquet-go	39.395s
ok  	github.com/parquet-go/parquet-go/bloom	0.02[7](https://github.com/parquet-go/parquet-go/actions/runs/5534777733/jobs/10100177839#step:5:8)s
ok  	github.com/par

Consider appending pages to on-disk column buffers when writing

The Parquet writer can use filesystem buffers to offload encoded and compressed pages to disk before writing them to the target file. When using FS buffers, we often hit a limit on number of open file descriptors and I suspect it's because each page is flushed to a separate file.

I wonder if we can keep one file per column and append to the same file when writing. During the writer flush process, we could copy column files back to back into the target parquet file.

I will add a "need more info" label since I am not sure if this approach is feasible.

I'm sorry

Based on #1 , I had the wrong impression I was the only one who stepped up and temporary taking over maintaining this project.

Fixing issues and ensuring the healthy of the library has been my core objective. I raised a ticket to outline first steps I had in mind for the project here #19.

Unfortunately, I was operating under the assumption I can make correct judgement while executing my duties. I had no impression that there are other maintainers out there who were actively involved for this I'm terribly sorry.

I love this project and I want it to thrive, If my actions hurts the project in any way I apologise to anyone affected by it.

For those who were wondering.

I merged a fix to a critical issue #31 without giving time for maintainers to review.

That being said, I have some questions.

  • Is there governance for this project that I'm not aware of ?
  • Is there a list of active maintainers ?
  • Should I step down ?
  • What should be done moving forward ? I'm happy to revert all non approved changes, Including removing the release just please let me know who I should talk to.

Thanks and have a good day.

zstd encoding is not useful

given struct:

type Meta struct {
	Name        string   `parquet:",zstd,dict"`
	PeriodType  string   `parquet:",zstd,dict"`
	PeriodUnit  string   `parquet:",zstd,dict"`
	SampleType  string   `parquet:",zstd,dict"`
	SampleUnit  string   `parquet:",zstd,dict"`
	Timestamp   int64    `parquet:",zstd,timestamp,delta"` // time.Time is not supported
	Duration    int64    `parquet:",zstd,delta"`
	Period      int64    `parquet:",zstd,delta"`
	ProfileID   string   `parquet:",zstd,"`
	LabelsKey   []string `parquet:",zstd,list,dict"` // this value definition level is always 1
	LabelsValue []string `parquet:",zstd,list,dict"`
	TotalValue  int64    `parquet:",zstd"`
	SampleLen   int64    `parquet:",zstd"`
}

use it to write:

parquet.NewGenericWriter[Meta](&b)

i find the encoding is not useful

map/list are not properly supported when using `any` and `GenericReader`

Hello,

it seems to me that there is a discrepancy when reading maps or lists with the generic any.
Here is a simple testcase:

func TestParquetReaderSegmentRepro(t *testing.T) {
	type RowType struct {
		FirstName string
		Thing     map[string]int
	}

	file, err := os.CreateTemp(t.TempDir(), "fakeFilename")
	if err != nil {
		t.Fail()
	}
	writer := segmentparquet.NewGenericWriter[RowType](file)
	n, err := writer.Write([]RowType{
		{"John", map[string]int{"a": 1}},
		{"George", map[string]int{"b": 2}},
	})
	_ = n
	if err != nil {
		t.Fail()
	}
	err = writer.Close()
	if err != nil {
		t.Fail()
	}

	reader := segmentparquet.NewGenericReader[any](file)
	for {
		hi := make([]any, 1)
		howMany, err := reader.Read(hi)
		if err != nil {
			break
		}
		if howMany != 1 {
			t.Fail()
		}
	}
}

Here I create a new file with a struct that contains a map, and then try to read it with any, I get back an object that it is like this:

{"FirstName": "John", "Thing": {"key_value": [{"key":"a","value":1}]}}

instead of:

{"FirstName": "John", "Thing" :{"a":1}}

Note, that if I read it again by providing the RowType in the generic input, it is correct.
Am I missing something? 🤔

[PowerBI] Yet implemented: Unsupported encoding.

I encountered an error while attempting to open a file with a string column in PowerBI. The specific error message I received was:

Parquet: class parquet::ParquetException (message: 'Not yet implemented: Unsupported encoding.')

Interestingly, the issue does not occur when the Parquet file contains only numeric columns; it works perfectly in that case.

However, I successfully converted data.csv, which contains strings, to Parquet format using parquet-tools, and I was able to open it in PowerBI without any problems.

Here is the code I used to test the scenario:

// Not works
type RowString struct{ Data string }
parquet.WriteFile("file_with_string.parquet", []RowString{
	{Data: "Bob"},
})

// Not works
group := parquet.Group{
	"Data": parquet.Optional(parquet.String()),
}
schema := parquet.NewSchema("schema", group)
output, _ := os.OpenFile("file_with_string2.parquet", os.O_RDWR|os.O_CREATE, 0755)
pw := parquet.NewGenericWriter[any](output, schema)
defer pw.Close()
builder := parquet.NewRowBuilder(group)
builder.Add(0, parquet.ByteArrayValue([]byte(`Bob`)))
pw.WriteRows([]parquet.Row{builder.Row()})

// Works
type RowInt struct{ Data int }
parquet.WriteFile("file_with_int.parquet", []RowInt{
	{Data: 1},
})

Documentation and/or updates to running writer_test.go

The tests in writer_test.go seem very useful as a check of the writers. However, it relies on the parquet-tools command and so can pass or fail depending on environment configuration. A number of utilities called "parquet-tools" exist, and the one that I infer it's based on may be deprecated?

It would be really nice to have clear documentation of how to set up one's environment to run these tests, and if it relies on a deprecated utility it would be good to find a maintained replacement.

Spark can't read parquet-go generated files: can not read class org.apache.parquet.format.PageHeader

Hi!

I have a go program that generate parquet files. (I am using go 1.17)

func Example() {
	// Creating writer:
	writer := parquet.NewWriter(
		localParquetFile,
		parquet.SchemaOf(pblogger.ParquetEventV3{}),
		parquet.Compression(parquet.LookupCompressionCodec(format.Snappy)),
		parquet.WriteBufferSize(50_000),
	)

	// Write ~10k rows
	writer.Write(myData)

	// Close
	writer.Close()
	localParquetFile.Close()
}

This is working well. I can read the generated files with the parquet-go library with no issues.

I generate around 1000 files / hours. Each files is around 5mb.

I have a spark job ingesting those files and running aggregates on the data.
This spark job fails because for some files (not all of them, ~10% of them), it can't read the PageHeader
Error: can not read class org.apache.parquet.format.PageHeader: Socket is closed by peer
Full trace HERE

When I try to read this corrupted file with parquet-go, it still work.

Tools like https://github.com/apache/parquet-mr also shows the same error on corrupted files when running command like cat. The rowcount command works well.

How can I fix this?
Thanks!

go: 1.17.5
parquet-go: github.com/segmentio/parquet-go v0.0.0-20220720215406-8d9a5b560e39
spark: Spark 3.3.2

RLE encoding cannot be specified in parquet struct tags

When generating a schema for a type, there's no struct tag value to specify regular RLE encoding. It seems like all this requires is an additional case when checking the tags and an update to the documentation.

I'm happy to submit a PR for this if it's as simple as it seems.

Add support to int16

When trying to write an int16 field

panic: cannot convert Go values of type int16 to parquet value
goroutine 6 [running]:
github.com/parquet-go/parquet-go.writeRowsFuncOf({0x100cfa8c8, 0x100884200}, 0x0?, {0xc000614600, 0x1, 0x1})
/Users/c12326a/git/credit-header/ch-build-conmaster/vendor/github.com/parquet-go/parquet-go/column_buffer.go:2016 +0x588

Drop support for Go 1.17

segmentio#428

  • Remove 1.17 from github workflow
  • Update go.mod to declare the Go 1.18 requirement
  • Migrate code from *_go18.go and *_go18_test.go files into the corresponding *.go and *_test.go files

Unlike the original ticket, there is no need to rename the Generic* apis yet.

is it possible to convert any json file to parquet file. Schema of json varies from file to file.

something like
var m map[string]interface{}
json.Unmarshal(inputJson,&m)
if err := parquet.WriteFile("test1.parquet",[]map[string]interface{}{m});err!=nil{
fmt.Println(err)
}

Getting the error as
panic: cannot construct parquet schema from value of type map[string]interface {}

goroutine 1 [running]:
github.com/parquet-go/parquet-go.schemaOf({0x1025d8a18?, 0x10256f020})
/Users/sree/workspace/go/pkg/mod/github.com/parquet-go/[email protected]/schema.go:111 +0x184
github.com/parquet-go/parquet-go.NewGenericWriter[...]({0x1025d0000, 0x1400000e080?}, {0x14000159e28, 0x14000159e38?, 0x1024a5da4?})
/Users/sree/workspace/go/pkg/mod/github.com/parquet-go/[email protected]/writer.go:95 +0x94
github.com/parquet-go/parquet-go.Write[...]({0x1025d0000, 0x1400000e080}, {0x1400000e078, 0x1, 0x1}, {0x0, 0x102899fe0?, 0x1400000e078?})
/Users/sree/workspace/go/pkg/mod/github.com/parquet-go/[email protected]/parquet.go:75 +0x7c
github.com/parquet-go/parquet-go.WriteFile[...]({0x1024a9441, 0x14000159f48?}, {0x1400000e078?, 0x1, 0x1}, {0x0, 0x0, 0x0})

Data corruption when recording byte array column statistics

Discovered while implementing #52

parquet-cli showed that for one of the sample columns containing ["Solo", "Skywalker", "Skywalker"] the min and max values recorded were ("Skywalker", "Skywalker"), not ("Skywalker", "Solo") as they should have been.

I spent a bit more time investigating this. The problem comes from these lines in (*writerColumn).recordPageStats (writer.go:1560):

if existingMaxValue.isNull() || c.columnType.Compare(maxValue, existingMaxValue) > 0 {
    c.columnChunk.MetaData.Statistics.MaxValue = maxValue.Bytes()
}

For byte-array-backed Values, Bytes() returns an unsafe cast of the value ptr to a byte-slice. In this case, the ptr points into the values slice of the columnBuffer's page, and that slice gets saved directly into the statistics. It seems like after we finish flushing this page, the page's values slice gets reused - as the next page is written, it overwrites the MaxValue stored in the statistics. In this particular case, the stored MaxValue was "Solo" (&values[0], 4) but "Skywalker" is the first value of the next data page, which gets written to &values[0]. When we update the statistics for this page, we compare "Skywalker" (&values[0], 9) against "Skyw" (&values[0], 4), find it's larger, and updates it. In general, though, this could behave even more erratically.

Fundamentally, it seems like the solution is that the stored value needs to be copied into a slice backed by a new array, not left pointing to the one inside the page. The question is: how should we do it?

Option 1: Change the implementation of (Value).Bytes() to always copy the bytes into a new array. This makes the API less dangerous and may prevent other bugs like this, but it will negatively impact performance anywhere Bytes() is called and creating a copy is not dangerous. It's hard to assess how big a deal this is from either perspective. Bytes() is currently only called in (*writerColumn).makePageStatistics and (*writerColumn).recordPageStats. We have to add the copy in recordPageStats, but makePageStatistics seems to work as-is (I think this is because this value isn't held past the page flush in this case.) However, because Value is exported outside the package, the level of safety and performance impact is probably overwhelmed by the effects on usage outside the package, which are hard to know. I worry a little bit that from outside the package it's nearly impossible to tell when the value returned by Bytes is safe or when it needs to be copied, unless for some reason all values exposed outside the module are safe.

Option 2: We could do the copy inside (*writerColumn).recordPageStats. This has the smallest impact on the system, affecting only the one site where we know there's a problem. However, it leaves us and others open to similar bugs in the future. It also imposes a small performance/complexity hit on recordPageStats, though in practice this probably won't matter much. EDIT: I realized that using (Value).AppendBytes actually works better than (Value).Bytes in all cases, and requires the fewest unnecessary allocations.

Option 3: Implement a new method (Value).SafeBytes() (name TBD) that always copies the data into a new slice, and let the caller decide on the behavior. IMO this probably pollutes the API more than it provides useful options, but I include it for completeness.

Write Parquet file with custom schema

Hello,

is that possible to produce parquet files whose schema is not known at compile time? I'm trying like in the example below but getting NULL in all fields in each row.

package main

import (
	"os"

	"github.com/parquet-go/parquet-go"
)

func main() {
	file, err := os.Create("test.parquet")
	if err != nil {
		panic(err)
	}
	defer file.Close()

	s := parquet.NewSchema("Record", parquet.Group{
		"Text": parquet.Optional(parquet.String()),
	})
	w := parquet.NewGenericWriter[any](file, s)
	_, err = w.WriteRows([]parquet.Row{{
		parquet.ByteArrayValue([]byte("Hello")),
	}})
	if err != nil {
		panic(err)
	}

	err = w.Close()
	if err != nil {
		panic(err)
	}
}

The schema generated is

Metadata for file: test.parquet

version: 1
num of rows: 1
created by: github.com/parquet-go/parquet-go version 0.0.0(build bb12c19a1110)
metadata:
message Record {
  OPTIONAL BYTE_ARRAY Text (STRING);
}

And when I output content of the produced file

#################################################
File: test.parquet
#################################################

{Text: null}

Panic when reading nested map-values with a schema

In the example test below, I'm trying to write/read a nested map-value using an equivalent but different schema. The struct used for writing/reading has the same field as the one that is used to generate the schema, but it uses different types. This currently fails with:

panic: reflect.Value.Convert: value of type parquet_test.SchemaMapValue cannot be converted to type parquet_test.MapValue

It's interesting this only happens when using a nested map-value. It's just fine in a slice or regular structs. Using struct as map-value is fine too, it's just when nesting is involved.

While the example below seems contrived since the SchemaMessage technically isn't necessary, I have a use-case that involves generating a new type using reflection which is then used with parquet.SchemaOf() to generate a schema that runs into the same issue.

func TestMapValue(t *testing.T) {
	// Main struct used to mashal/unmarshal data.
	type NestedStruct struct {
		Val string
	}
	type MapValue struct {
		Nested NestedStruct
	}
	type Message struct {
		TestMap map[string]MapValue
	}

	// Clone of the main struct used exclusively to generate the schema.
	type SchemaNestedStruct struct {
		Val string
	}
	type SchemaMapValue struct {
		Nested SchemaNestedStruct
	}
	type SchemaMessage struct {
		TestMap map[string]SchemaMapValue
	}

	testKey, testValue := "test-key", "test-value"
	in := Message{
		TestMap: map[string]MapValue{
			"test-key": {
				Nested: NestedStruct{
					Val: testValue,
				},
			},
		},
	}

	var f bytes.Buffer

	// Generate a schema for reading/writing using an exact clone of data.
	schema := parquet.SchemaOf(SchemaMessage{})

	pw := parquet.NewGenericWriter[Message](&f, schema)
	_, err := pw.Write([]Message{in})
	if err != nil {
		t.Fatal(err)
	}

	err = pw.Close()
	if err != nil {
		t.Fatal(err)
	}

	pr := parquet.NewGenericReader[*Message](bytes.NewReader(f.Bytes()), schema)

	out := make([]*Message, 1)
	_, err = pr.Read(out)
	if err != nil {
		t.Fatal(err)
	}
	pr.Close()
	if want, got := testValue, out[0].TestMap[testKey].Nested.Val; want != got {
		t.Error("failed to read map value")
	}
}

I haven't yet tried to debug this further myself yet. Any pointers or suggestions for things to try out are appreciated.

versioned 0.x.x releases

There is a note on the README that we are on 0.x.x and things are subject to change. A new home needs a new start. One of the things I would like to do is start tagging our releases.

I intentionally didn't break things on #16 relating to renaming GenericReader to Reader[T] for this reason. No need to break things without a clear migration path.

The plan is to

  • After consolidating current work I will make the first release 0.x.x
  • I will remove all deprecated features and embrace generics, possibly move requirements to go1.20 then increment the minor component of the semver and drop a new release (which will be the way forward for the package development).

This way , users can pin to specific tags, and we can always back port bug fixes.

note:

I haven't decided on which tag version to start with (v0.1.0 sounds silly lol!) , any suggestions are welcome.

Performance regression on lazy load commit

We are seeing a performance regression on the lazy load commit.

We have a holistic benchmark test suite that runs a handful of TraceQL queries over a block.

Bench results:

name                                             old time/op    new time/op    delta
BackendBlockTraceQL/spanAttValNoMatch-8            8.90ms ± 0%   28.06ms ± 1%  +215.32%  (p=0.008 n=5+5)
BackendBlockTraceQL/spanAttIntrinsicNoMatch-8      8.46ms ± 0%   28.20ms ± 0%  +233.27%  (p=0.008 n=5+5)
BackendBlockTraceQL/resourceAttValNoMatch-8        8.29ms ± 1%    8.35ms ± 2%      ~     (p=0.421 n=5+5)
BackendBlockTraceQL/resourceAttIntrinsicMatch-8    22.2ms ± 1%    22.3ms ± 1%      ~     (p=0.548 n=5+5)
BackendBlockTraceQL/mixedValNoMatch-8               252ms ± 1%     278ms ± 3%   +10.19%  (p=0.008 n=5+5)
BackendBlockTraceQL/mixedValMixedMatchAnd-8        8.33ms ± 0%    8.36ms ± 1%      ~     (p=0.690 n=5+5)
BackendBlockTraceQL/mixedValMixedMatchOr-8          265ms ± 1%     269ms ± 2%      ~     (p=0.056 n=5+5)
BackendBlockTraceQL/count-8                         472ms ±13%     495ms ± 9%      ~     (p=0.222 n=5+5)
BackendBlockTraceQL/struct-8                        776ms ± 2%     774ms ± 2%      ~     (p=0.690 n=5+5)
BackendBlockTraceQL/||-8                            153ms ± 1%     154ms ± 2%      ~     (p=0.548 n=5+5)
BackendBlockTraceQL/mixed-8                        28.7ms ± 1%    29.0ms ± 1%      ~     (p=0.056 n=5+5)

Profiles:

profiles.tar.gz

Corrupt parquet files due to overflow in UncompressedPageSize

I came across an issue where parquet files written by parquet-go were found to be corrupted, leading to an inability to read them using parquet-go. The panic occurred within the filePages.ReadPage() function, more specifically at bufferPool.get(bufferSize int).

Upon debugging, it was evident that the bufferSize causing the panic had the value -1926395834. The buffer size is derived from header.UncompressedPageSize.

Based on my investigation, it appears that an overflow may be occurring in the conversion of the uncompressed page size value from int to int32 in writer.go.

Who is responsible for cleaning up files done in `fileBufferPool.GetBuffer` ?

Hi there! Thanks for the amazing library. One question I have is related to ability to store parquet pages in files. From the docs:

writer := parquet.NewGenericWriter[Row](oFile, writerConfig, parquet.ColumnPageBuffers(
    parquet.NewFileBufferPool(dir, "buffers.*"),
))

It seems like the underlying fileBufferPool.GetBuffer opens temporary file and returns io.ReadWriteSeeker, so it seems to me this is not closed anywhere. I'm currently experiencing issues related to the number of open file descriptors. Does it make sense to close the buffers as soon as they are used?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.