Giter Site home page Giter Site logo

apache / arrow-datafusion-comet Goto Github PK

View Code? Open in Web Editor NEW
458.0 53.0 103.0 5.11 MB

Apache DataFusion Comet Spark Accelerator

Home Page: https://datafusion.apache.org/comet

License: Apache License 2.0

Makefile 0.16% Shell 0.26% Java 14.06% Scala 38.87% Rust 46.66%
arrow datafusion rust spark

arrow-datafusion-comet's Introduction

Apache DataFusion Comet

Apache DataFusion Comet is an Apache Spark plugin that uses Apache DataFusion as native runtime to achieve improvement in terms of query efficiency and query runtime.

Comet runs Spark SQL queries using the native DataFusion runtime, which is typically faster and more resource efficient than JVM based runtimes.

Comet aims to support:

  • a native Parquet implementation, including both reader and writer
  • full implementation of Spark operators, including Filter/Project/Aggregation/Join/Exchange etc.
  • full implementation of Spark built-in expressions
  • a UDF framework for users to migrate their existing UDF to native

Architecture

The following diagram illustrates the architecture of Comet:

Current Status

The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.

Feature Parity with Apache Spark

The project strives to keep feature parity with Apache Spark, that is, users should expect the same behavior (w.r.t features, configurations, query results, etc) with Comet turned on or turned off in their Spark jobs. In addition, Comet extension should automatically detect unsupported features and fallback to Spark engine.

To achieve this, besides unit tests within Comet itself, we also re-use Spark SQL tests and make sure they all pass with Comet extension enabled.

Supported Platforms

Linux, Apple OSX (Intel and M1)

Requirements

  • Apache Spark 3.2, 3.3, or 3.4
  • JDK 8, 11 and 17 (JDK 11 recommended because Spark 3.2 doesn't support 17)
  • GLIBC 2.17 (Centos 7) and up

Getting started

See the DataFusion Comet User Guide for installation instructions.

Contributing

See the DataFusion Comet Contribution Guide for information on how to get started contributing to the project.

arrow-datafusion-comet's People

Contributors

advancedxy avatar andygrove avatar beryllw avatar ceppelli avatar comphead avatar dependabot[bot] avatar edmondop avatar ganeshkumar269 avatar haoxins avatar huaxingao avatar jc4x4 avatar kazuyukitanimura avatar leoluan2009 avatar mattharder91 avatar parthchandra avatar planga82 avatar psvri avatar rohitrastogi avatar rz-vastdata avatar snmvaughan avatar sonhmai avatar sunchao avatar thexiay avatar tshauck avatar vaibhawvipul avatar vidyasankarv avatar viirya avatar wankunde avatar wforget avatar zuston avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

arrow-datafusion-comet's Issues

Create CI/CD pipelines

We need to create some basic CI/CD pipelines to run through tests to at least run through tests in the repo itself.

Add executeColumnarCollectIterator to CometExec to collect Comet operator result

What is the problem the feature request solves?

Currently except for executing entire query plan by calling collect, show etc., we don't have a way to collect the result of any Comet operators in the middle of a query. In many cases, we need this feature so we can evaluate a child operator inside another Comet operator. This is also useful for debugging an arbitrary Comet operator.

Describe the potential solution

Add an new API executeColumnarCollectIterator to CometExec to collect Comet operator result.

Additional context

No response

Enable columnar shuffle by default

What is the problem the feature request solves?

COMET_COLUMNAR_SHUFFLE_ENABLED is the config Comet uses to decide if columnar shuffle should be used instead of native shuffle, if comet shuffle is enabled.

Because columnar shuffle covers more use cases (hash, range and single partitioning) than native shuffle. We should change this config to prefer columnar shuffle.

Note that this is blocked by the Java Arrow issue: apache/arrow#40038 (PR: apache/arrow#40043)

Currently if columnar shuffle is enabled, running Comet with TPCDS queries will get the error:

General execution error with reason org.apache.comet.CometNativeException: Fail to process Arrow array with reason C Data interface error: The external buffer at position 1 is null...

Describe the potential solution

No response

Additional context

No response

What's the Supported Input Data Type

What is the problem the feature request solves?

Hi Team,

I am wondering will we have a documentation about what kinda of data types are supported by Comet/Datafusion (Like, Decimal(16, 6), interger .... ) and will not fallback to vanilla spark and cause the regression.

Or where I can lookup to find related information. Since this is very important while we are trying to evaluate whether to use Comet or not.

Thank you so much !

Describe the potential solution

No response

Additional context

No response

Add CI for TPCDS queries

What is the problem the feature request solves?

We need a CI pipeline that could help us verify Comet query correctness.

Describe the potential solution

Add a CI pipeline that runs TPCDS queries with Comet enabled and compares the results with Spark.

Additional context

No response

Consider removing `ParquetFilters`

What is the problem the feature request solves?

The ParquetFilters were added originally so we could shade Parquet in Comet. However the shading was removed later as it caused a lot of trouble on the Iceberg side. In addition, we ported several Parquet classes from parquet-mr into Comet, so now the boundary between Comet and parquet-mr is fairly thin. Therefore, we could consider removing ParquetFilters and directly use the one from Spark.

The advantage of this is we are able to absorb changes & improvements in the newer version of Spark. For instance, apache/spark#36696 added Parquet In/NotIn pushdown from Spark side, which is only available since Spark 3.4. At the moment, as Comet keeps a copy of Spark's ParquetFilters, the feature is not added in order to be backward compatible with Spark 3.2 and 3.3.

Describe the potential solution

Evaluate whether we can remove ParquetFilters from Comet.

Additional context

No response

Cast string to timestamp not compatible with Spark logic

I was manually experimenting with some cast operations based on my experience of implementing them in Spark RAPIDS and found the following example of incorrect behavior. I would recommend implementing some fuzz tests to find these kind of issues.

Test data

scala> robots.show
+------+
|  name|
+------+
|WALL-E|
|  R2D2|
|    T2|
+------+

Test with Comet

scala> import org.apache.spark.sql.types._

scala> val df = robots.withColumn("date", col("name").cast(DataTypes.TimestampType))

scala> df.show
+------+----+
|  name|date|
+------+----+
|WALL-E|null|
|  R2D2|null|
|    T2|null|
+------+----+

Test with Spark

scala> spark.conf.set("spark.comet.enabled", false)

scala> df.show
+------+-------------------+
|  name|               date|
+------+-------------------+
|WALL-E|               null|
|  R2D2|               null|
|    T2|2024-02-09 02:00:00|
+------+-------------------+

T2 is a valid timestamp because T is the separator between the optional date and the time portion. 2 is a valid time because some time fields are optional.

Maven cache in Github workflow is not working

Describe the bug

It looks like the Maven cache we added for Github workflow is not working.

In "Post Cache Maven dependencies", I see this message:

Warning: Path Validation Error: Path(s) specified in the action for caching do(es) not exist, hence no cache is being saved.

And Maven compilation needs to download artifacts each run:

Downloading from central: https://repo.maven.apache.org/maven2/org/scalastyle/scalastyle-maven-plugin/1.0.0/scalastyle-maven-plugin-1.0.0.pom
Progress (1): 1.4/8.6 kB
Progress (1): 2.8/8.6 kB
Progress (1): 4.1/8.6 kB
Progress (1): 5.5/8.6 kB
Progress (1): 6.9/8.6 kB
Progress (1): 8.3/8.6 kB
Progress (1): 8.6 kB

Steps to reproduce

Currently it's reproduce-able in each CI run.

Expected behavior

The Maven dependencies should be cached so new runs don't need to download the artifacts again.

Additional context

No response

Exception occurred while broadcasting an empty result

Describe the bug

An exception occurs when broadcasting an empty result. I think this is due to the extra bytes being written in the encode compressed stream, causing it to be determined not empty in CometExec$.decodeBatches.

error detail:

Caused by: java.io.IOException: Unexpected end of input. Missing schema.
	at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:205)
	at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:185)
	at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:176)
	at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:73)
	at org.apache.comet.vector.StreamReader.<init>(StreamReader.scala:39)
	at org.apache.spark.sql.comet.execution.shuffle.ArrowReaderIterator.<init>(ArrowReaderIterator.scala:36)
	at org.apache.spark.sql.comet.CometExec$.decodeBatches(operators.scala:135)
	at org.apache.spark.sql.comet.CometBatchRDD.$anonfun$compute$1(CometBroadcastExchangeExec.scala:249)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
	at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
	at org.apache.spark.sql.comet.CometBatchRDD.compute(CometBroadcastExchangeExec.scala:249)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)

Steps to reproduce

broadcast tbl_a in CometBroadcastExchangeExec test case

SELECT /*+ BROADCAST(tbl_a) */ tbl_a._1, tbl_b._2 FROM tbl_a JOIN tbl_b " +
                "WHERE tbl_a._1 > tbl_a._2 LIMIT 2

Expected behavior

No response

Additional context

No response

Support build with Java 1.8

What is the problem the feature request solves?

Currently, the main branch cannot build with Java 1.8. The first issue when building with Java 1.8 is as follows:

[ERROR] Failed to execute goal io.github.git-commit-id:git-commit-id-maven-plugin:5.0.0:revision (get-the-git-infos) on project comet-common-spark3.4_2.12: Execution get-the-git-infos of goal io.github.git-commit-id:git-commit-id-maven-plugin:5.0.0:revision failed: Unable to load the mojo 'revision' in the plugin 'io.github.git-commit-id:git-commit-id-maven-plugin:5.0.0' due to an API incompatibility: org.codehaus.plexus.component.repository.exception.ComponentLookupException: pl/project13/maven/git/GitCommitIdMojo has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0

This is because git-commit-id-maven-plugin dropped Java 1.8 support in 5.0.0. There are other various errors that doesn't work with Java 1.8.

However, in the README, it indicates that it only requires JDK 8 and up
https://github.com/apache/arrow-datafusion-comet/blob/main/README.md#requirements

Do you think is it worthy to make comet support JDK 8? I think it would be beneficial to support JDK 8 as Spark itself still supports JDK 8 in 3.5 and it's only dropped in the up-coming 4.0 release.

Describe the potential solution

  1. downgrade the plugin version to be compatible with JDK 8
  2. various changes to make code compiles with JDK 8
  3. set-up ci to build and run with JDK 8

Additional context

No response

Run Spark SQL tests in CI

We need some way to run through all Spark SQL tests (i.e., those under spark/sql module), with Comet plugin enabled, for all the Spark branches we support (3.2, 3.3, 3.4 for now). One approach that have been discussed is we clone the the OSS Spark branches or release tags with some .diff files that we maintain in this repo. The downside, of course, is that these .diff files are relatively hard to maintain and could break from time to time when we sync these Spark branches with new commits.

Planning to publish Roadmap?

Hi, I would like to inquire about a roadmap for Comet.

I am currently exploring the option of replacing Spark workloads with native engines, and I'm keen to understand the future plans and scope of this plugin. Any information on the roadmap would be greatly appreciated.

Thank you!

Join support

This is an umbrella ticket for adding Join support to Comet. In Spark, there are basically three types of Join operators: BroadcastJoin, HashJoin, SortMergeJoin. In DataFusion, two Join operators are supported: HashJoin, SortMergeJoin (experimental).

We are going to delegate Spark Join operators to correspond DataFusion Join operators. We will also go to improve DataFusion Join operators if needed.

SortMergeJoin

HashJoin

  • Support HashJoin operator - #193

BroadcastJoin

  • Add CometBroadcastExchangeExec support broadcasting the result of Comet native operator - #81
  • Support BroadcastHashJoinExec #202
  • Enable Comet broadcast by default #212

Other Join operators (Cross Join)

  • Support CartesianProductExec - #199
  • Support BroadcastNestedLoopJoinExec - #198

How to use/test Comet?

Hi Team,

I am trying to evaluate the performance of using the comet plugin.
But I did not find any documentations about how to use the comet plugin after compiling it.
Specifically, how to use it in the cluster mode.

Thanks

Comet sink operator should not have children operators

Describe the bug

This fixes incorrect native operator when transforming Spark query plan to Comet query plan. We now add children operators into native operator anyway in operator2Proto. But for Comet sink operators e.g. shuffle, they don't have children operators but work as a input source. This doesn't cause issue because we don't actually plan these children for sink operators in native planner, but the native operator (proto message) looks incorrect and confusing.

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Add a Github check for PR titles

What is the problem the feature request solves?

We should enforce PR titles to follow the conventional commits style, and in the format such as "feat: XXX".

Describe the potential solution

Add a Github checker on the PR titles and potentially label them accordingly. Perhaps we can explore whether the marketplace already has something handy (like this one?)

Additional context

No response

Pull based native execution

What is the problem the feature request solves?

Comet native execution's scan is not started from native but from JVM. Thus Comet scan is push-based instead of pull-based. Although we pull next input batches from child operator in JVM, this new input is not pulled from native but pushed from JVM side.

For an operator like Expand, one input batch can produces multiple output batches. So we cannot pull next batch directly and push into native without peeking it. We need to "peek" into native side and see if any more output batch there. If so, we take it as next output, if not, we pull next input batch and push into native side to execute on it.

If we pull next input from child operator and push it into native without peek, new input will be ignored.

Not only we cannot have consistent way to get input for native operators. The code of input/output to native execution is harder to understand because we mix push-based and pull-based processing modes. This patch tries to make native execution fully pull-based.

Describe the potential solution

No response

Additional context

No response

Speedup CI and reduce test run time

What is the problem the feature request solves?

It looks like that CIs are quite slow, the most time consuming parts varies from 60m to 100+m.
image

It would be great that we could speed them up and hopefully keep them under ~30m.

Describe the potential solution

No response

Additional context

No response

Add documentation on how to use Comet

What is the problem the feature request solves?

We should add some documentation in README (maybe a "Getting Started" section) on how to use Comet and what configurations are needed.

Describe the potential solution

Add more documentation

Additional context

No response

Support InSet expression in Comet

What is the problem the feature request solves?

As an optimization of In expression, InSet is also an expression commonly used. We can support it to unblock more queries.

Describe the potential solution

No response

Additional context

No response

Add support of TakeOrderedAndProjectExec in Comet

What is the problem the feature request solves?

TakeOrderedAndProjectExec is a common operator in Spark. In Comet, we should support it to increase query coverage.

Describe the potential solution

No response

Additional context

No response

Add CometBroadcastExchangeExec to support broadcasting the result of Comet native operator

What is the problem the feature request solves?

We need to support broadcasting the result of Comet native operator. This is another step towards supporting BroadcastHashJoinExec.

Like Spark BroadcastExchangeExec, CometBroadcastExchangeExec is not directly called with doExecute or doExecuteColumnar as other operators. CometBroadcastExchangeExec is used when Spark query planner inserts BroadcastDistribution on top of an operator to broadcast it. The upstream operator of CometBroadcastExchangeExec, e.g., BroadcastHashJoinExec, will call executeBroadcast method of CometBroadcastExchangeExec to execute the query plan and broadcast its results.

Describe the potential solution

No response

Additional context

No response

Support `CollectLimit` operator

What is the problem the feature request solves?

We already support GlobalLimitExec and LocalLimitExec, so it should be relatively straightforward to support CollectLimitExec as well.

Describe the potential solution

Add a new BosonCollectLimitExec operator to support native execution of Spark's CollectLimit operator.

Additional context

No response

data type Binary not supported in shuffle write

Describe the bug

Found the issue when CometCollectExec is added during #100.

The error msg is: data type Binary not supported in shuffle write

Reproduced in the local environment, the whole stack trace is :

org.apache.comet.CometNativeException: not implemented: data type Binary not supported in shuffle write
        at std::backtrace_rs::backtrace::libunwind::trace(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93)
        at std::backtrace_rs::backtrace::trace_unsynchronized(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/../../backtrace/src/backtrace/mod.rs:66)
        at std::backtrace::Backtrace::create(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/backtrace.rs:331)
        at comet::errors::init::{{closure}}(./core/src/errors.rs:134)
        at  as core::ops::function::Fn>::call(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/alloc/src/boxed.rs:2021)
        at std::panicking::rust_panic_with_hook(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:757)
        at std::panicking::begin_panic_handler::{{closure}}(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:631)
        at std::sys_common::backtrace::__rust_end_short_backtrace(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/sys_common/backtrace.rs:170)
        at rust_begin_unwind(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/std/src/panicking.rs:619)
        at core::panicking::panic_fmt(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/core/src/panicking.rs:72)
        at comet::execution::datafusion::shuffle_writer::slot_size(./core/src/execution/datafusion/shuffle_writer.rs:319)
        at comet::execution::datafusion::shuffle_writer::PartitionBuffer::init_active_if_necessary::{{closure}}(./core/src/execution/datafusion/shuffle_writer.rs:225)
        at core::iter::adapters::map::map_fold::{{closure}}(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/core/src/iter/adapters/map.rs:84)
        at core::iter::traits::iterator::Iterator::fold(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/core/src/iter/traits/iterator.rs:2639)
        at  as core::iter::traits::iterator::Iterator>::fold(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/core/src/iter/adapters/map.rs:124)
        at ::sum(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/core/src/iter/traits/accum.rs:50)
        at core::iter::traits::iterator::Iterator::sum(/rustc/ec08a0337f3556212525dbf1d3b41e19bdf27621/library/core/src/iter/traits/iterator.rs:3634)
        at comet::execution::datafusion::shuffle_writer::PartitionBuffer::init_active_if_necessary(./core/src/execution/datafusion/shuffle_writer.rs:221)
        at comet::execution::datafusion::shuffle_writer::PartitionBuffer::append_rows(./core/src/execution/datafusion/shuffle_writer.rs:246)
        at comet::execution::datafusion::shuffle_writer::PartitionBuffer::append_batch(./core/src/execution/datafusion/shuffle_writer.rs:237)
        at comet::execution::datafusion::shuffle_writer::ShuffleRepartitioner::insert_batch::{{closure}}(./core/src/execution/datafusion/shuffle_writer.rs:730)
        at comet::execution::datafusion::shuffle_writer::external_shuffle::{{closure}}(./core/src/execution/datafusion/shuffle_writer.rs:930)

It's clear that slot_size and append_columns doesn't include binary support.

Steps to reproduce

No response

Expected behavior

Binary data could be wrote by shuffle writer.

Additional context

I can submit a PR for this.

Exclude .github from apache-rat-plugin check

What is the problem the feature request solves?

Running make test locally gets rat check on the files under .github. These files don't require license header.

Describe the potential solution

No response

Additional context

No response

MacOS (x86_64) CI is flaky with libcrypto issue

Describe the bug

Github CI for MacOS x86_64 is currently flaky with message:

WARNING: /Users/runner/hostedtoolcache/Java_Zulu_jdk/8.0.402-6/x64/jre/bin/java is loading libcrypto in an unsafe way

Some attempts have been made in #55 and #41, but it is still not resolved.

Steps to reproduce

Check the post-commit CI runs and occasionally it still fails with the above message.

Expected behavior

The CI for MacOS x86_64 should pass consistently.

Additional context

No response

Support multiple input sources for CometNativeExec

What is the problem the feature request solves?

CometNativeExec currently limits the number of input source. That blocks the operators with multiple input sources like join operator. This patch generalizes the input source handling to remove the limitation.

Describe the potential solution

No response

Additional context

No response

Support Emit::First for SumDecimalGroupsAccumulator

What is the problem the feature request solves?

If the upstream operator of HashAggregate is sorted, HashAggregate will try to emit first groups if possible. Our custom SumDecimalGroupsAccumulator doesn't support Emit::First. So if Sort is followed by HashAggregate and there is sum of decimal, the query will fail with:

        at std::backtrace::Backtrace::create(__internal__:0)                                                                                                                                                                           
        at comet::errors::init::{{closure}}(__internal__:0)                                                                                                                                                                            
        at std::panicking::rust_panic_with_hook(__internal__:0)                                                                                                                                                                        
        at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)                                                                                                                                                            
        at std::sys_common::backtrace::__rust_end_short_backtrace(__internal__:0)                                                                                                                                                      
        at _rust_begin_unwind(__internal__:0)                                                                                                                                                                                          
        at core::panicking::panic_fmt(__internal__:0)                                                                                                                                                                                  
        at <comet::execution::datafusion::expressions::sum_decimal::SumDecimalGroupsAccumulator as datafusion_physical_expr::aggregate::groups_accumulator::GroupsAccumulator>::state(__internal__:0)                                  
        at datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream::emit(__internal__:0)                                                                                                                            
        at <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next(__internal__:0)                                                                                     
        at <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next(__internal__:0)                                                                                     
        at <datafusion_physical_plan::projection::ProjectionStream as futures_core::stream::Stream>::poll_next(__internal__:0)                                                                                                         
        at std::panicking::try::do_call(__internal__:0)                                                                                                                                                                                
        at _Java_org_apache_comet_Native_executePlan(__internal__:0)     

Describe the potential solution

Support Emit::First for SumDecimalGroupsAccumulator

Additional context

No response

Create multi-arch platform for testing and release process

As Comet supports multiple architectures including x86_64, aarch64 (e.g., Apple Silicon), arm64 etc, we need to add CI pipelines to run tests and create releases for these. In particular, given AWS Graviton is widely used to run Spark jobs, it's important to have support for it.

Add golden files for TPC-H/TPC-DS suites

We currently have a bunch of TPC-H/TPC-DS test suites including CometTPCDSQuerySuite, CometPlanStabilitySuite, etc. In order to use these, we need to add the golden files generated via Comet first.

date and timestamp trunc should support an array of formats

Describe the bug

Spark's trunc and date_trunc functions take a format as input where the format may be either a scalar string or an array of strings.
Comet's current implementation only supports a format that is a scalar string.

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Upgrade DF to 36.0.0, arrow-rs to 50.0.0

What is the problem the feature request solves?

Upgrade Comet dependencies to most recent DF to 36.0.0, arrow-ts to 50.0.0 to benefit with latest fixes

Describe the potential solution

No response

Additional context

No response

Upgrade `jni-rs` to 0.21

What is the problem the feature request solves?

We are currently using 0.19.0 which is a bit old already. The latest release provides updated safety, better JVM handling (for testing), and avoids some data copying. It isn't a straight upgrade, and will require some migration (see 0.21 Migration)

Describe the potential solution

Upgrade jni-rs to 0.21 for Comet.

Additional context

No response

Nested map support for columnar shuffle

What is the problem the feature request solves?

Columnar shuffle supports struct and list types. We should go to support map type as well.

Describe the potential solution

No response

Additional context

No response

Support Count(Distinct) (and similar) aggregation

What is the problem the feature request solves?

We should also support aggregations such as count(distinct(col)) from tbl. In Spark,

SELECT COUNT(DISTINCT(_1)) FROM tbl

produces a plan like the following:

AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(distinct _1#9)], output=[count(DISTINCT _1)#16L])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=40]
      +- HashAggregate(keys=[], functions=[partial_count(distinct _1#9)], output=[count#20L])
         +- HashAggregate(keys=[_1#9], functions=[], output=[_1#9])
            +- Exchange hashpartitioning(_1#9, 5), ENSURE_REQUIREMENTS, [plan_id=37]
               +- HashAggregate(keys=[_1#9], functions=[], output=[_1#9])
                  +- Scan parquet [_1#9] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int>

Describe the potential solution

Add the support for COUNT(DISTINCT) (and similar), so the Spark physical plan can be properly converted to a native plan and executed.

Additional context

No response

Build with Spark 3.2 failed

Describe the bug

comet fails to compile with Spark version 3.2

[ERROR] Failed to execute goal com.diffplug.spotless:spotless-maven-plugin:2.43.0:check (default) on project comet-parent-spark3.2_2.12: Execution default of goal com.diffplug.spotless:spotless-maven-plugin:2.43.0:check failed: Unable to load the mojo 'check' in the plugin 'com.diffplug.spotless:spotless-maven-plugin:2.43.0' due to an API incompatibility: org.codehaus.plexus.component.repository.exception.ComponentLookupException: com/diffplug/spotless/maven/SpotlessCheckMojo has been compiled by a more recent version of the Java Runtime (class file version 55.0), this version of the Java Runtime only recognizes class file versions up to 52.0
[ERROR] spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala:1069: value offset is not a member of org.apache.spark.sql.DataFrame

Steps to reproduce

PROFILES="-Pspark-3.2" make

Expected behavior

Compilation passes

Additional context

No response

Cast string to boolean not compatible with Spark

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val inputs = Seq("TR", "FA").toDF("n")
inputs: org.apache.spark.sql.DataFrame = [n: string]

scala> inputs.write.parquet("test.parquet")
24/02/13 07:58:44 INFO src/lib.rs: Comet native library initialized
                                                                                
scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [n: string]

scala> val df2 = df.withColumn("converted", col("n").cast(DataTypes.BooleanType))
df2: org.apache.spark.sql.DataFrame = [n: string, converted: boolean]

scala> df2.show
+---+---------+                                                                 
|  n|converted|
+---+---------+
| FA|    false|
| TR|     true|
+---+---------+

scala> spark.conf.set("spark.comet.enabled", false)

scala> df2.show
+---+---------+
|  n|converted|
+---+---------+
| FA|     null|
| TR|     null|
+---+---------+

Parquet column with integer logical type cannot read as Spark date column

Describe the bug

Date type is not decoded expectedly

For the table that has a Date type column (EventDate),

create external table hits (
    EventDate DATE NOT NULL, ...

EventDate column isn't decoded as Date type.

>>> df = spark.sql("SELECT EventDate FROM hits limit 5")
>>> df
DataFrame[EventDate: date]
>>> df.show()
+---------+
|EventDate|
+---------+
|    15901|
|    15901|
|    15901|
|    15901|
|    15901|
+---------+
# integer value, not date!

MIN/MAX aggregation fails with Date type

>>> spark.sql("SELECT MIN(EventDate) FROM hits;").show()
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.comet.CometRuntimeException: Internal error: MIN/MAX is not expected to receive scalars of incompatible types (Date32("NULL"), Int32(15901)).
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:77)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:116)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:166)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Steps to reproduce

  1. Download https://github.com/ClickHouse/ClickBench?tab=readme-ov-file#data-loading
  2. Create table with the DDL below.
  3. Run queries
create external table hits
(
    WatchID BIGINT NOT NULL,
    JavaEnable SMALLINT NOT NULL,
    Title STRING NOT NULL,
    GoodEvent SMALLINT NOT NULL,
    EventTime BIGINT NOT NULL,
    EventDate DATE NOT NULL,
    CounterID INT NOT NULL,
    ClientIP INT NOT NULL,
    RegionID INT NOT NULL,
    UserID BIGINT NOT NULL,
    CounterClass SMALLINT NOT NULL,
    OS SMALLINT NOT NULL,
    UserAgent SMALLINT NOT NULL,
    URL STRING NOT NULL,
    Referer STRING NOT NULL,
    IsRefresh SMALLINT NOT NULL,
    RefererCategoryID SMALLINT NOT NULL,
    RefererRegionID INT NOT NULL,
    URLCategoryID SMALLINT NOT NULL,
    URLRegionID INT NOT NULL,
    ResolutionWidth SMALLINT NOT NULL,
    ResolutionHeight SMALLINT NOT NULL,
    ResolutionDepth SMALLINT NOT NULL,
    FlashMajor SMALLINT NOT NULL,
    FlashMinor SMALLINT NOT NULL,
    FlashMinor2 STRING NOT NULL,
    NetMajor SMALLINT NOT NULL,
    NetMinor SMALLINT NOT NULL,
    UserAgentMajor SMALLINT NOT NULL,
    UserAgentMinor STRING NOT NULL,
    CookieEnable SMALLINT NOT NULL,
    JavascriptEnable SMALLINT NOT NULL,
    IsMobile SMALLINT NOT NULL,
    MobilePhone SMALLINT NOT NULL,
    MobilePhoneModel STRING NOT NULL,
    Params STRING NOT NULL,
    IPNetworkID INT NOT NULL,
    TraficSourceID SMALLINT NOT NULL,
    SearchEngineID SMALLINT NOT NULL,
    SearchPhrase STRING NOT NULL,
    AdvEngineID SMALLINT NOT NULL,
    IsArtifical SMALLINT NOT NULL,
    WindowClientWidth SMALLINT NOT NULL,
    WindowClientHeight SMALLINT NOT NULL,
    ClientTimeZone SMALLINT NOT NULL,
    ClientEventTime bigint NOT NULL,
    SilverlightVersion1 SMALLINT NOT NULL,
    SilverlightVersion2 SMALLINT NOT NULL,
    SilverlightVersion3 INT NOT NULL,
    SilverlightVersion4 SMALLINT NOT NULL,
    PageCharset STRING NOT NULL,
    CodeVersion INT NOT NULL,
    IsLink SMALLINT NOT NULL,
    IsDownload SMALLINT NOT NULL,
    IsNotBounce SMALLINT NOT NULL,
    FUniqID BIGINT NOT NULL,
    OriginalURL STRING NOT NULL,
    HID INT NOT NULL,
    IsOldCounter SMALLINT NOT NULL,
    IsEvent SMALLINT NOT NULL,
    IsParameter SMALLINT NOT NULL,
    DontCountHits SMALLINT NOT NULL,
    WithHash SMALLINT NOT NULL,
    HitColor STRING NOT NULL,
    LocalEventTime bigint NOT NULL,
    Age SMALLINT NOT NULL,
    Sex SMALLINT NOT NULL,
    Income SMALLINT NOT NULL,
    Interests SMALLINT NOT NULL,
    Robotness SMALLINT NOT NULL,
    RemoteIP INT NOT NULL,
    WindowName INT NOT NULL,
    OpenerName INT NOT NULL,
    HistoryLength SMALLINT NOT NULL,
    BrowserLanguage STRING NOT NULL,
    BrowserCountry STRING NOT NULL,
    SocialNetwork STRING NOT NULL,
    SocialAction STRING NOT NULL,
    HTTPError SMALLINT NOT NULL,
    SendTiming INT NOT NULL,
    DNSTiming INT NOT NULL,
    ConnectTiming INT NOT NULL,
    ResponseStartTiming INT NOT NULL,
    ResponseEndTiming INT NOT NULL,
    FetchTiming INT NOT NULL,
    SocialSourceNetworkID SMALLINT NOT NULL,
    SocialSourcePage STRING NOT NULL,
    ParamPrice BIGINT NOT NULL,
    ParamOrderID STRING NOT NULL,
    ParamCurrency STRING NOT NULL,
    ParamCurrencyID SMALLINT NOT NULL,
    OpenstatServiceName STRING NOT NULL,
    OpenstatCampaignID STRING NOT NULL,
    OpenstatAdID STRING NOT NULL,
    OpenstatSourceID STRING NOT NULL,
    UTMSource STRING NOT NULL,
    UTMMedium STRING NOT NULL,
    UTMCampaign STRING NOT NULL,
    UTMContent STRING NOT NULL,
    UTMTerm STRING NOT NULL,
    FromTag STRING NOT NULL,
    HasGCLID SMALLINT NOT NULL,
    RefererHash BIGINT NOT NULL,
    URLHash BIGINT NOT NULL,
    CLID INT NOT NULL
)
using parquet
location './hits/';

Expected behavior

If DDL specifies Date type, the column should be decoded as Date type.

Additional context

$ duckdb -c "from parquet_schema('hits.parquet')"
┌──────────────┬───────────────────────┬────────────┬─────────────┬─────────────────┬──────────────┬────────────────┬───────┬───────────┬──────────┬────────────────────────────────┐
│  file_name   │         name          │    type    │ type_length │ repetition_type │ num_children │ converted_type │ scale │ precision │ field_id │          logical_type          │
│   varchar    │        varchar        │  varchar   │   varchar   │     varchar     │    int64     │    varchar     │ int64 │   int64   │  int64   │            varchar             │
├──────────────┼───────────────────────┼────────────┼─────────────┼─────────────────┼──────────────┼────────────────┼───────┼───────────┼──────────┼────────────────────────────────┤
...
│ hits.parquet │ EventDate             │ INT32      │             │ REQUIRED        │              │ UINT_16        │       │           │          │ IntType(bitWidth=, isSigned=0) │
...

Include Rust's native stack trace in CometNativeException

What is the problem the feature request solves?

Currently, the backtrace is only added when rust panics.

However, when the exception is thrown back to JVM side. The error msg sometimes might be not that helpful. It would be ideal to get the backtrace for the rust error as well.

Describe the potential solution

No response

Additional context

No response

Comet native shuffle in rust doesn't handle empty projection properly

Describe the bug

When testing #100, I noticed that Comet's columnar shuffle doesn't handle empty projection correctly. The shuffle write
thread throws an exception as follows:

Caused by: org.apache.comet.CometNativeException: Arrow error: External error: Arrow error: Invalid argument error: must either specify a row count or at least one column
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:65)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:111)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:126)
	at org.apache.spark.sql.comet.execution.shuffle.CometShuffleWriteProcessor.write(CometShuffleExchangeExec.scala:290)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Steps to reproduce

In org.apache.comet.exec.CometExecSuite, modify the empty projection test case as following:

  test("empty projection") {
    withParquetDataFrame((0 until 5).map(i => (i, i + 1))) { df =>
      assert(df.where("_1 IS NOT NULL").count() == 5)
      checkSparkAnswerAndOperator(df)
      assert(df.select().limit(2).count() === 2)
    }
  }

Expected behavior

Test case could passed correctly.

Additional context

No response

Add Shuffle support

What is the problem the feature request solves?

Open sourcing shuffle features to Comet

Describe the potential solution

No response

Additional context

No response

Comet returns different results for 5 TPCDS queries compared with Spark

Describe the bug

TPC-DS q34, q66, q64, q71, q88, q90, q96 are disabled right now because they generate different results from Spark.

q34 seems something to do with sorting. The order is different.

  java.lang.Exception: Expected "...NULL	NULL	NULL	NULL	[126143	15
NULL	NULL	NULL	NULL	21529]3	15
NULL	NULL	Mrs. ...", but got "...NULL	NULL	NULL	NULL	[215293	15
NULL	NULL	NULL	NULL	12614]3	15
NULL	NULL	Mrs. ..." Result did not match

q64 is also about sorting

  java.lang.Exception: Expected "...  	ation	35709     	[996       	NULL	Bridgeport	65817     	752       	Lakeview Lincoln	Friendship	74536     	1999	1	15.78	24.93	0.00	17.09	26.31	0.00	2000	1
ablepricallyantiought                             	ation	35709     	71        	River River	Friendship	34536     	NULL	NULL	Newport	NULL	1999	1	22.60	38.87	0.00	17.09	26.31	0.00	2000	1
...", but got "...  	ation	35709     	[71        	River River	Friendship	34536     	NULL	NULL	Newport	NULL	1999	1	22.60	38.87	0.00	17.09	26.31	0.00	2000	1
ablepricallyantiought                             	ation	35709     	996       	NULL	Bridgeport	65817     	752       	Lakeview Lincoln	Friendship	74536     	1999	1	15.78	24.93	0.00	17.09	26.31	0.00	2000	1
..." Result did not match

Steps to reproduce

Enable these 3 queries in CometTPCDSQuerySuite and run the test suite.

Expected behavior

Comet should produce the same query results as Spark for these 3 queries.

Additional context

No response

Introduce unified memory manager

What is the problem the feature request solves?

Currently Comet has separate memory management from JVM and native: on JVM side, it uses Spark's memory manager, while on native side, it uses DF's memory manager. This poses an issue when we have both Spark and native operators in a single query, since the split is static and we cannot request & spill memory from native to JVM, or vice versa.

In contrast, Photon uses Spark's memory manager to manage memory from both JVM and native:

Photon and Apache Spark share the same cluster and thus must have a consistent view of memory and disk usage to avoid being OOM-killed by the OS or the JVM. As a result, Photon hooks into Apache Spark’s memory manager. To handle this, we separate the concept of memory reservations from allocations in Photon. A memory reservation asks for memory from Spark’s unified memory manager. Like all requests to the memory manager, this can cause a spill, where Spark asks some memory consumer to release memory to satisfy a new request. Photon hooks into this memory consumer API, so Spark can ask Photon to spill data on behalf of other memory-consuming operators that are executing (e.g., a sort task in Spark may ask a Photon operator to spill). Similarly, Photon can make reservations that cause other Spark operators to spill, or cause Photon itself to spill (leading to a “recursive spill” where one Photon operator spills memory on behalf of another one). This differs slightly from many other database engines, where operators are given a fixed memory budget and can only “self-spill.” Spilling is dynamic because we often do not have information on how much data an operator will consume, especially if SQL operators are co-existing with user-defined code that also reserves memory. We use the same policy as open source Apache Spark to determine which operator to spill. To summarize, if we need to spill 𝑁 bytes to satisfy a memory reservation request, we sort the memory consumers from least to most allocated memory, and spill the first consumer that holds at least 𝑁 bytes. The rationale behind this is that we minimize the number of spills and avoid spilling more data than necessary. After reserving memory, Photon can allocate memory safely without spilling. Allocation is purely local to Photon: Spark only accounts for the memory that Photon asks for. In spilling operators such as hash join and grouping aggregation the processing of an input batch is thus broken up into two phases: a reservation phase where memory is acquired for the new input batch and spilling is handled, and an allocation phase where transient data can be produced since no spilling can occur. This flexible spilling mechanism has been critical in the Lakehouse context, where queries often exceed the available memory

In particular:

Photon hooks into this memory consumer API, so Spark can ask Photon to spill data on behalf of other memory-consuming operators that are executing (e.g., a sort task in Spark may ask a Photon operator to spill). Similarly, Photon can make reservations that cause other Spark operators to spill, or cause Photon itself to spill (leading to a “recursive spill” where one Photon operator spills memory on behalf of another one)

which is a better approach. Note the pre-requisite for this is that the memory manager should be using off-heap memory, so that it can be shared between JVM and native.

On Comet side, DF already supports a pluggable memory manager through its memory_pool API. This allows us to implement an adapter class on the Java side to delegate the calls to Spark's memory manager via JNI.

Unlike Spark, DF doesn't trigger spilling from registered memory consumers when under pressure (removed via apache/datafusion#4522). This means the behavior will differ depending on whether the memory consumer is on the JVM side or native side: when a memory consumer is requesting more memory than currently available, the memory manager will trigger spill from memory consumers from the JVM, but not native.

Another requirement is for Spark to switch to use off-heap memory when the unified memory manager is active. By default, Spark uses on-heap memory, so this will require configuration changes from users, which is inconvenient. We could leverage Spark's DriverPlugin to overwrite memory setting so this transition becomes transparent. For instance, if user set spark.executor.memory to be 64G. The driver plugin could overwrite the memory setting to be:

  • spark.memory.offHeap.enabled: true
  • spark.executor.memory: 64 * 0.3
  • spark.memory.offHeap.size: 64 * (1 - 0.3)

This assumes we have a configuration spark.comet.memory.onHeap.fraction which specifies how much memory should be reserved for on-heap. The value is 0.3 in the above example.

Describe the potential solution

Introduce a new memory_pool implementation for Comet, which forwards the memory allocation / deallocation calls to the Spark's unified memory manager instance.

Additional context

No response

Appending null values to element array builders of StructBuilder for null row in a StructArray

Describe the bug

When encountering a null row, besides appending a null value to StructBuilder by calling its append_null, we also need to append null values to all its element array builders, so their lengths are kept the same.

Otherwise, when building the StructArray, arrow will report the error that our customer found during testing columnar shuffle:

```
Caused by: org.apache.comet.CometRuntimeException: StructBuilder and field_builders are of unequal lengths.
```

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Cast string to integral type not compatible with Spark

scala> val inputs = Seq("123", "-123", "86374").toDF("n")
inputs: org.apache.spark.sql.DataFrame = [n: string]

scala> inputs.write.parquet("test.parquet")
24/02/13 07:40:14 INFO src/lib.rs: Comet native library initialized
                                                                                
scala> val df = spark.read.parquet("test.parquet")
df: org.apache.spark.sql.DataFrame = [n: string]

scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._

scala> val df2 = df.withColumn("converted", col("n").cast(DataTypes.ShortType))
df2: org.apache.spark.sql.DataFrame = [n: string, converted: smallint]

scala> df2.show
+-----+---------+
|    n|converted|
+-----+---------+
|86374|    20838|
| -123|     -123|
|  123|      123|
+-----+---------+

scala> spark.conf.set("spark.comet.enabled", false)

scala> df2.show
+-----+---------+
|    n|converted|
+-----+---------+
|86374|     null|
| -123|     -123|
|  123|      123|
+-----+---------+

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.