Giter Site home page Giter Site logo

parquet-index's Introduction

parquet-index

Spark SQL index for Parquet tables

Build Status Coverage Status Join the chat at https://gitter.im/lightcopy/parquet-index

Overview

Package allows to create index for Parquet tables (as datasource and persistent tables) to reduce query latency when used for almost interactive analysis or point queries in Spark SQL. It is designed for use case when table does not change frequently, but is used for queries often, e.g. using Thrift JDBC/ODBC server. When indexed, schema and list of files (including partitioning) will be automatically resolved from index metastore instead of inferring schema every time datasource is created.

Project is experimental. Any feedback, issues, or PRs are welcome.

Documentation reflects changes in master branch, for documentation on a specific version, please select corresponding version tag or branch.

Metastore

Metastore keeps information about all indexed tables and can be created on local file system or HDFS (see available options below) with support for in-memory cache of index (after first scan). Each created index includes different statistics (min/max/null) and, optionally, column filters statistics (e.g. bloom filters) on indexed columns.

Supported predicates

Index is automatically enabled for scan when provided predicate contains one or several filters with indexed columns; if no filters on indexed columns are provided, then normal scan is used, but with benefits of already resolved partitions and schema. Applying min/max statistics and column filter statistics (if available) happens after partition pruning. Statistics are kept per Parquet block metadata. Note that performance also depends on values distribution and predicate selectivity. Spark Parquet reader is used to read data.

Most of the Spark SQL predicates are supported to use statistics and/or column filter (EqualTo, In, GreaterThan, LessThan, and others). Note that predicates work best for equality or isin conditions and logical operators (And, Or, Not), e.g. $"a" === 1 && $"b" === "abc" or $"a".isin("a", "b", "c").

Supported Spark SQL types

Currently only these types are supported for indexed columns:

  • IntegerType
  • LongType
  • StringType
  • DateType
  • TimestampType

Limitations

  • Indexed columns must be top level primitive columns with types above
  • Indexed columns cannot be the same as partitioning columns
  • Append mode is not yet supported for Parquet table when creating index
  • Certain Spark versions are supported (see table below)

Requirements

Spark version parquet-index latest version
1.6.x Not supported
2.0.0 0.2.3
2.0.1 0.2.3
2.0.2 0.2.3
2.1.x 0.3.0
2.2.x 0.4.0
2.3.x fd442d (not released yet)
2.4.x 5051f9 (not released yet)
3.0.0 0.5.0
  • Scala 2.12.x
  • JDK 8+

Previous versions have support for Scala 2.11.x, Scala 2.10.x, and JDK 7, see README and build.sbt for corresponding tag or branch. See build section to compile for desired Java/Scala versions.

And, if using the Python API, Python 3.x with a working version of pyspark.

The current version parts ways with Python 2 definitely. Python 2.7 is officially deprecated, which is the reason why we opted not to write a retrocompatible wrapper around the Scala API.

Linking

The parquet-index package can be added to Spark by using the --packages command line option. For example, run this to include it when starting spark-shell (Scala 2.12.x):

 $SPARK_HOME/bin/spark-shell --packages lightcopy:parquet-index:0.5.0-s_2.12

Or for pyspark to use Python 3 API (see section below):

$SPARK_HOME/bin/pyspark --packages lightcopy:parquet-index:0.5.0-s_2.12

Options

Currently supported options, use --conf key=value on a command line to provide options similar to other Spark configuration or add them to spark-defaults.conf file.

Name Description Default
spark.sql.index.metastore Index metastore location, created if does not exist (file:/folder, hdfs://host:port/folder) ./index_metastore
spark.sql.index.parquet.filter.enabled When set to true, write filter statistics for indexed columns when creating table index, otherwise only min/max statistics are used. Filter statistics are used during filtering stage, if applicable (true, false) true
spark.sql.index.parquet.filter.type When filter statistics enabled, select type of statistics to use when creating index (bloom, dict) bloom
spark.sql.index.parquet.filter.eagerLoading When set to true, read and load all filter statistics in memory the first time catalog is resolved, otherwise load them lazily as needed when evaluating predicate (true, false) false
spark.sql.index.createIfNotExists When set to true, create index if one does not exist in metastore for the table, and will use all available columns for indexing (true, false) false
spark.sql.index.partitions When creating index uses this number of partitions. If value is non-positive or not provided then uses sc.defaultParallelism * 3 or spark.sql.shuffle.partitions configuration value, whichever is smaller min(default parallelism * 3, shuffle partitions)

Example

Scala API

Most of the API is defined in DataFrameIndexManager. Usage is similar to Spark's DataFrameReader, but for spark.index. See example below on different commands (runnable in spark-shell).

// Start spark-shell and create dummy table "codes.parquet", use repartition
// to create more or less generic situation with value distribution
spark.range(0, 1000000).
  select($"id", $"id".cast("string").as("code"), lit("xyz").as("name")).
  repartition(400).
  write.partitionBy("name").parquet("temp/codes.parquet")

import com.github.lightcopy.implicits._
// Create index for table, this will create index files in index_metastore,
// you can configure different options - see table above

// All Spark SQL modes are available ('append', 'overwrite', 'ignore', 'error')
// You can also use `.indexByAll` to choose all columns in schema that
// can be indexed
spark.index.create.
  mode("overwrite").indexBy($"id", $"code").parquet("temp/codes.parquet")

// Check if index for table exists, should return "true"
spark.index.exists.parquet("temp/codes.parquet")

// Query table using index, should return 1 record, and will scan only small
// number of files (1 file usually if filter statistics are enabled). This
// example uses filters on both columns, though any filters can be used,
// e.g. only on id or code
// Metastore will cache index catalog to reduce time for subsequent calls
spark.index.parquet("temp/codes.parquet").
  filter($"id" === 123 && $"code" === "123").collect

// Delete index in metastore, also invalidates cache
// no-op if there is such index does not exist
// (does NOT delete original table)
spark.index.delete.parquet("temp/codes.parquet")

// You can compare performance with this
spark.read.parquet("temp/codes.parquet").
  filter($"id" === 123 && $"code" === "123").collect

Java API

To use indexing in Java create QueryContext based on SparkSession and invoke method index() to get index functionality. Example below illustrates how to use indexing in standalone application.

import com.github.lightcopy.QueryContext;

// Optionally use `config(key, value)` to specify additional index configuration
SparkSession spark = SparkSession.
  builder().
  master("local[*]").
  appName("Java example").
  getOrCreate();

// Create query context, entry point to working with parquet-index
QueryContext context = new QueryContext(spark);

// Create index by inferring columns from Parquet table
context.index().create().indexByAll().parquet("table.parquet");

// Create index by specifying index columns, you can also provide `Column` instances, e.g.
// `new Column[] { new Column("col1"), new Column("col2") }`.
// Mode can be provided as `org.apache.spark.sql.SaveMode` or String value
context.index().create().
  mode("overwrite").
  indexBy(new String[] { "col1", "col2" }).
  parquet("table.parquet");

// Check if index exists for the table
boolean exists = context.index().exists().parquet("table.parquet");

// Run query for indexed table
Dataset<Row> df = context.index().parquet("table.parquet").filter("col2 = 'c'");

// Delete index from metastore
context.index().delete().parquet("table.parquet");

Python 3.x API

Following example shows usage of Python 3 API (runnable in pyspark)

from lightcopy.index import QueryContext

# Create QueryContext from SparkSession
context = QueryContext(spark)

# Create index in metastore for Parquet table 'table.parquet' using 'col1' and 'col2' columns
context.index.create.indexBy('col1', 'col2').parquet('table.parquet')

# Create index in metastore for Parquet table 'table.parquet' using all inferred columns and
# overwrite any existing index for this table
context.index.create.mode('overwrite').indexByAll().parquet('table.parquet')

# Check if index exists, returns 'True' if exists, otherwise 'False'
context.index.exists.parquet('table.parquet')

# Query index for table, returns DataFrame
df = context.index.parquet('table.parquet').filter('col1 = 123')

# Delete index from metastore, if index does not exist - no-op
context.index.delete.parquet('table.parquet')

Persistent tables API

Package also supports index for persistent tables that are saved using saveAsTable() in Parquet format and accessible using spark.table(tableName). When using with persistent tables, just replace .parquet(path_to_the_file) with .table(table_name). API is available in Scala, Java and Python 3.

Scala

import com.github.lightcopy.implicits._

// Create index for table name that exists in Spark catalog
spark.index.create.indexBy("col1", "col2", "col3").table("table_name")

// Check if index exists for persistent table
val exists: Boolean = spark.index.exists.table("table_name")

// Query index for persistent table
val df = spark.index.table("table_name").filter("col1 > 1")

// Delete index for persistent table (does not drop table itself)
spark.index.delete.table("table_name")

Java

// Java API is very similar to Scala API
import com.github.lightcopy.QueryContext;

SparkSession spark = ...;

QueryContext context = new QueryContext(spark);

// Create index for persistent table
context.index().create().indexByAll().table("table_name");

// Check if index exists for persistent table
boolean exists = context.index().exists().table("table_name");

// Run query for indexed persistent table
Dataset<Row> df = context.index().table("table_name").filter("col2 = 'c'");

// Delete index from metastore (does not drop table)
context.index().delete().table("table_name");

Python 3

from lightcopy.index import QueryContext

context = QueryContext(spark)

# Create index from Spark persistent table
context.index.create.mode('overwrite').indexBy('col1', 'col2').table('table_name')

# Check if index exists for persistent table. 'True' if exists in metastore, 'False' otherwise
context.index.exists.table('table_name')

# Query indexed persistent table
df = context.index.table('table_name').filter('col1 = 123')

# Delete index for persistent table (does not drop table)
context.index.delete.table('table_name')

Building From Source

This library is built using sbt, to build a JAR file simply run sbt package from project root.

Testing

Run sbt test from project root. See .travis.yml for CI build matrix.

parquet-index's People

Contributors

aaaaaaron avatar arunbhat avatar erolm-a avatar sadikovi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

parquet-index's Issues

Add SQL API

This one can be tricky, since we need to hack sql parser in Spark to support this feature. This will enable usage of JDBC in Spark.

Fails to create index for UTF strings

Fails to create due to #9. Exception stack trace:

java.lang.IllegalArgumentException: requirement failed: Min é is greater than max a
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetStringStatistics.<init>(statistics.scala:171)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD$.convertStatistics(ParquetStatisticsRDD.scala:238)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD$$anonfun$5.apply(ParquetStatisticsRDD.scala:219)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD$$anonfun$5.apply(ParquetStatisticsRDD.scala:212)
  • Example to reproduce:
Seq("a", "é").toDF("name").coalesce(1).write.parquet("temp/bad.parquet")
spark.read.parquet("temp/bad.parquet").where("name > 'a'").show

When trying the same query with index, exception above is thrown.

  • Example to show incorrect result:
Seq("aa", "", "bb").toDF("name").coalesce(1).write.parquet("temp/bad2.parquet")
spark.read.parquet("temp/bad2.parquet").where("name > 'bb'").show

returns this result:

+----+
|name|
+----+
||
+----+
spark.index.create.indexByAll.parquet("temp/bad2.parquet")
spark.index.parquet("temp/bad2.parquet").where("name > 'bb'").show

returns wrong result:

+----+
|name|
+----+
+----+

Ability to create index when loading table

Add flag spark.sql.index.createIfNotExists or something similar to support situation of creating index when first index scan is made for a table. Subsequent scans will use created index.

Refactor catalog tests to easily check partitions selection

This issue is for updating index catalog and source strategy tests to make it easy to verify filter effect on partition selection, e.g. testing GreaterThan filter on selecting only these partitions and not touching ones that do not have matched statistics, etc.

Fail to load index from empty folder

This is inverse of #26. Since index check is based on verifying that directory exists, it returns true, if directory already created, but when loading index, fails to look up _table_metadata for Parquet index. It would be better to create _SUCCESS file in root index directory to know whether or not folder is index directory.

Update filter flags to allow other than bloom filters

Currently we have flag to enable bloom filters, we should update it to enable filter statistics and type of those filters, e.g. bloom, dict, etc. I am proposing these options:

  • spark.sql.index.parquet.filter.enabled - whether or not filter statistics are enabled, default is true
  • spark.sql.index.parquet.filter.type - filter statistics type, e.g. bloom, dict, and others, default is bloom
  • spark.sql.index.parquet.filter.eagerLoading - whether or not load filters eagerly when we read index catalog (everything will be in memory), or lazily, only loading certain filters that are required to evaluate predicate (same as what we currently do)

Consider cache for query plan

Currently we cache filter statistics and table metadata for each queried table. This issue is about caching query plan, so when we hit the same plan, we can yield result immediately without resolving filters at all.

Investigate splitting column statistics into blocks

Currently with statistics patch we will have only column metadata, block metadata will be one per file, regardless how large file is. We should investigate if it is worth maintaining separate statistics for each block (e.g. every 1,000,000 records).

Filter selection should be documented

When filtering by 2 columns, only one of them is indexed, you would see similar to this log:

col("col1") === new java.sql.Date(300000000L) || col("col2") === new java.sql.Timestamp(330000000000L)

scala> df.show
17/02/05 20:36:01 INFO IndexSourceStrategy: Pruning directories with: 
17/02/05 20:36:01 INFO IndexSourceStrategy: Index columns List(col1#10)
17/02/05 20:36:01 INFO IndexSourceStrategy: Index set {col1#10}
17/02/05 20:36:01 INFO IndexSourceStrategy: Normalized filters List(((col1#10 = 3) || (col2#11 = 330000000000000)))
17/02/05 20:36:01 INFO IndexSourceStrategy: Applying index filters: 
17/02/05 20:36:01 INFO IndexSourceStrategy: Post-Scan filters: ((col1#10 = 3) || (col2#11 = 330000000000000))
17/02/05 20:36:01 INFO IndexSourceStrategy: Output data schema: struct<col1: date, col2: timestamp, col3: string, col4: boolean ... 2 more fields>
17/02/05 20:36:01 INFO IndexSourceStrategy: Pushed filters: Or(EqualTo(col1,1970-01-04),EqualTo(col2,1980-06-16 22:40:00.0))

It does not apply index filters on line 17/02/05 20:36:01 INFO IndexSourceStrategy: Applying index filters: . This should be documented in comments, it happens because there is no way to separate predicates for filtering, e.g. we would not know if col2 value exists, therefore need to scan entire table, so index is disabled in this case.

Use bucketing for index

Currently bucketing is not supported, meaning that package does not take advantage of bucketing, so bucketed table would be processed and indexed like standard partitioned table. We should investigate if we can leverage that information in index.

Reduce logging in tests

Mainly need to remove Parquet logging when reading a file. Would be good to log only relevant classes in org.apache.spark.sql.

Fail to check if index exists for non-existent directory

Expected to return boolean, throws exception instead in case when looking up non-existent directory:

scala> spark.index.exists.parquet("temp/abc")
17/01/02 19:49:01 INFO Metastore: Resolved metastore directory to file:/Users/sadikovi/Developer/parquet-index/index_metastore
17/01/02 19:49:01 INFO Metastore: Registered file system org.apache.hadoop.fs.LocalFileSystem@529b0328
17/01/02 19:49:01 INFO Metastore: Registered cache com.google.common.cache.LocalCache$LocalManualCache@5cf2ed6f
java.io.FileNotFoundException: File temp/abc does not exist
  at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:537)
  at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:750)
  at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:527)
  at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:409)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource$.resolveTablePath(IndexedDataSource.scala:150)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource.tablePath$lzycompute(IndexedDataSource.scala:43)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource.tablePath(IndexedDataSource.scala:41)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$existsIndex$1.apply(IndexedDataSource.scala:102)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$existsIndex$1.apply(IndexedDataSource.scala:102)
  at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource.logInfo(IndexedDataSource.scala:32)
  at org.apache.spark.sql.execution.datasources.IndexedDataSource.existsIndex(IndexedDataSource.scala:102)
  at org.apache.spark.sql.ExistsIndexCommand.table(DataFrameIndexManager.scala:221)
  at org.apache.spark.sql.ExistsIndexCommand.parquet(DataFrameIndexManager.scala:227)
  ... 50 elided

Fail with metastore permissions error

From time to time loading index fails with permissions error:

java.lang.IllegalStateException: Expected directory with rwxrw-rw-, found file:/Users/sadikovi/Developer/parquet-index/index_metastore(rwxr-xr-x)
  at org.apache.spark.sql.execution.datasources.Metastore.validateMetastoreStatus(Metastore.scala:112)
  at org.apache.spark.sql.execution.datasources.Metastore.resolveMetastore(Metastore.scala:93)

Add performance tests and benchmark

Add performance tests to compare with Parquet implementation or compare performance against releases. This should be run as part of CI to determine if there is a regression in performance.

NullPointerException when running on Yarn

When running in Yarn mode, I get a NPE, same code works in local mode
`context.index.create.indexBy('driverId', 'timestamp').parquet('s3://archival-store/test/bench1day7/gps/packet_date=2018-03-25/')

Traceback (most recent call last):
File "", line 1, in
File "/home/anshuls/parquet-index/python/src/lightcopy/index.py", line 122, in parquet
self._createIndex(path)
File "/home/anshuls/parquet-index/python/src/lightcopy/index.py", line 103, in _createIndex
jcreate.createIndex(path)
File "/usr/local/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in call
File "/usr/local/spark/latest/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/spark/latest/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o62.createIndex.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 1.0 failed 4 times, most recent failure: Lost task 36.3 in stage 1.0 (TID 116, ip-172-18-54-234.us-west-2.compute.internal, executor 1): java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:232)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:223)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD.compute(ParquetStatisticsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.datasources.parquet.ParquetMetastoreSupport.createIndex(ParquetMetastoreSupport.scala:140)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$createIndex$2.apply(IndexedDataSource.scala:108)
at org.apache.spark.sql.execution.datasources.IndexedDataSource$$anonfun$createIndex$2.apply(IndexedDataSource.scala:107)
at org.apache.spark.sql.execution.datasources.Metastore.create(Metastore.scala:162)
at org.apache.spark.sql.execution.datasources.IndexedDataSource.createIndex(IndexedDataSource.scala:107)
at org.apache.spark.sql.CreateIndexCommand.createIndex(DataFrameIndexManager.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:232)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:223)
at org.apache.spark.sql.execution.datasources.parquet.ParquetStatisticsRDD.compute(ParquetStatisticsRDD.scala:122)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
`
It looks like the hadoopConf is somehow Null, although I'm able to access it in my shell just fine

Can it work with Spark SQL directly?

I mean the SQL language in Spark SQL, rather than spark sql api.
For example:
Dataset sqlDF =
spark.sql("SELECT * FROM parquet.examples/src/main/resources/users.parquet");

Improve filter resolution in foldFilter

It can be very slow to load filters for the first time. We can load them in parallel when incremental loading is selected. It does not apply for eager loading.

Fails for empty blocks

Querying table fails if one of the Parquet files is empty, because of the condition on non-empty blocks.

17/01/08 17:41:43 INFO IndexSourceStrategy: Applying index filters: IsNotNull(col1),EqualTo(col1,2)
java.lang.IllegalArgumentException: requirement failed: Parquet file status has empty blocks, required at least one block metadata
  at scala.Predef$.require(Predef.scala:224)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetIndexCatalog.resolveSupported(ParquetIndexCatalog.scala:131)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetIndexCatalog$$anonfun$pruneIndexedPartitions$2$$anonfun$8.apply(ParquetIndexCatalog.scala:144)
  at org.apache.spark.sql.execution.datasources.parquet.ParquetIndexCatalog$$anonfun$pruneIndexedPartitions$2$$anonfun$8.apply(ParquetIndexCatalog.scala:143)

Lazy column filter loading vs eager loading

Currently filters are loaded lazily, which will still be the case in refactoring work for statistics. I think we should also consider eager filter loading, when everything is loaded at the time catalog is loaded and cached; possible issue might be when we add dictionary filters which can be take fair amount of space, and it would be better to load them on demand.

parquet-index is fantastic

Tes Env && Data

I have tested parquet-index with spark-2.0.1 in local model for a long time:

driver: --master local[1]
spark.driver.memory                1g

the data total count is: 464946

test data: wget http://tsdata.bts.gov/PREZIP/On_Time_On_Time_Performance_2015_09.zip

index

i convert csv to parquet partition by year and month.
first query base on parquet file, and the time cost:

/Library/Java/JavaVirtualMachines/jdk8/bin/java -cp /Users/nathan/Tool/spark/spark2/conf/:/Users/nathan/Tool/spark/spark2/jars/*:/Users/nathan/Tool/hadoop/hadoop2/etc/hadoop/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/*:/Users/nathan/Tool/hadoop/hadoop2/contrib/capacity-scheduler/*.jar -Xmx1g -server -Xms256m -XX:+UseG1GC -XX:+UseCompressedOops -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark org.apache.spark.deploy.SparkSubmit --master local[1] --class com.alipay.spark.App /Users/nathan/Tool/spark/spark2/spark-demo.jar sql
------------->>>>>/Users/nathan/Tool/spark/spark2/conf/config.properties
{csv=file:/Users/nathan/data/ontime/csv/*.csv, index=false, parquet=file:/Users/nathan/data/ontime/parq/}
sql>select count(1) from ontime where Quarter=3;
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+--------+
|count(1)|
+--------+
|  464946|
+--------+

time spend: 1927ms
total 1 records

i restart the spark shell , and still first query with parquet-index, index key: Quarter, the time cost is very fantastic:

nathan@/Users/nathan/Tool/spark/spark2➜ ./run.local.sh
/Library/Java/JavaVirtualMachines/jdk8/bin/java -cp /Users/nathan/Tool/spark/spark2/conf/:/Users/nathan/Tool/spark/spark2/jars/*:/Users/nathan/Tool/hadoop/hadoop2/etc/hadoop/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/common/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/hdfs/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/yarn/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/lib/*:/Users/nathan/Tool/hadoop/hadoop2/share/hadoop/mapreduce/*:/Users/nathan/Tool/hadoop/hadoop2/contrib/capacity-scheduler/*.jar -Xmx1g -server -Xms256m -XX:+UseG1GC -XX:+UseCompressedOops -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark org.apache.spark.deploy.SparkSubmit --master local[1] --class com.alipay.spark.App /Users/nathan/Tool/spark/spark2/spark-demo.jar sql
------------->>>>>/Users/nathan/Tool/spark/spark2/conf/config.properties
{csv=file:/Users/nathan/data/ontime/csv/*.csv, index=true, parquet=file:/Users/nathan/data/ontime/parq/}
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
sql>select count(1) from ontime where Quarter=3;
+--------+
|count(1)|
+--------+
|  464946|
+--------+

time spend: 931ms
total 1 records

Make column filter statistics typed

Currently ColumnFilterStatistics does not enforce types, we should change to unify types for ColumnStatistics and ColumnFilterStatistics.

Investigate different implementation of ParquetReader

Currently we are using Spark Parquet reader, this issue is about investigating if we can extract data pages and index those including each page statistics. During scan we would select only those pages that match predicate and read data from them.

Questions:

  • if file is compressed, is it worth doing this read?
  • How to reconstruct full record with this approach?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.