Giter Site home page Giter Site logo

delta-io / delta Goto Github PK

View Code? Open in Web Editor NEW
6.9K 216.0 1.6K 30.58 MB

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs

Home Page: https://delta.io

License: Apache License 2.0

Scala 45.00% Shell 0.24% Java 11.23% Dockerfile 0.01% Python 1.01% ANTLR 0.05% HCL 0.08% HTML 41.72% JavaScript 0.04% CSS 0.63%
spark acid big-data analytics delta-lake

delta's Introduction

Delta Lake Logo

Test License PyPI PyPI - Downloads

Delta Lake is an open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs for Scala, Java, Rust, Ruby, and Python.

The following are some of the more popular Delta Lake integrations, refer to delta.io/integrations for the complete list:

  • Apache Spark™: This connector allows Apache Spark™ to read from and write to Delta Lake.
  • Apache Flink (Preview): This connector allows Apache Flink to write to Delta Lake.
  • PrestoDB: This connector allows PrestoDB to read from Delta Lake.
  • Trino: This connector allows Trino to read from and write to Delta Lake.
  • Delta Standalone: This library allows Scala and Java-based projects (including Apache Flink, Apache Hive, Apache Beam, and PrestoDB) to read from and write to Delta Lake.
  • Apache Hive: This connector allows Apache Hive to read from Delta Lake.
  • Delta Rust API: This library allows Rust (with Python and Ruby bindings) low level access to Delta tables and is intended to be used with data processing frameworks like datafusion, ballista, rust-dataframe, vega, etc.

Table of Contents

Latest Binaries

See the online documentation for the latest release.

API Documentation

Compatibility

Delta Standalone library is a single-node Java library that can be used to read from and write to Delta tables. Specifically, this library provides APIs to interact with a table’s metadata in the transaction log, implementing the Delta Transaction Log Protocol to achieve the transactional guarantees of the Delta Lake format.

API Compatibility

There are two types of APIs provided by the Delta Lake project.

  • Direct Java/Scala/Python APIs - The classes and methods documented in the API docs are considered as stable public APIs. All other classes, interfaces, methods that may be directly accessible in code are considered internal, and they are subject to change across releases.
  • Spark-based APIs - You can read Delta tables through the DataFrameReader/Writer (i.e. spark.read, df.write, spark.readStream and df.writeStream). Options to these APIs will remain stable within a major release of Delta Lake (e.g., 1.x.x).
  • See the online documentation for the releases and their compatibility with Apache Spark versions.

Data Storage Compatibility

Delta Lake guarantees backward compatibility for all Delta Lake tables (i.e., newer versions of Delta Lake will always be able to read tables written by older versions of Delta Lake). However, we reserve the right to break forward compatibility as new features are introduced to the transaction protocol (i.e., an older version of Delta Lake may not be able to read a table produced by a newer version).

Breaking changes in the protocol are indicated by incrementing the minimum reader/writer version in the Protocol action.

Roadmap

Transaction Protocol

Delta Transaction Log Protocol document provides a specification of the transaction protocol.

Requirements for Underlying Storage Systems

Delta Lake ACID guarantees are predicated on the atomicity and durability guarantees of the storage system. Specifically, we require the storage system to provide the following.

  1. Atomic visibility: There must be a way for a file to be visible in its entirety or not visible at all.
  2. Mutual exclusion: Only one writer must be able to create (or rename) a file at the final destination.
  3. Consistent listing: Once a file has been written in a directory, all future listings for that directory must return that file.

See the online documentation on Storage Configuration for details.

Concurrency Control

Delta Lake ensures serializability for concurrent reads and writes. Please see Delta Lake Concurrency Control for more details.

Reporting issues

We use GitHub Issues to track community reported issues. You can also contact the community for getting answers.

Contributing

We welcome contributions to Delta Lake. See our CONTRIBUTING.md for more details.

We also adhere to the Delta Lake Code of Conduct.

Building

Delta Lake is compiled using SBT.

To compile, run

build/sbt compile

To generate artifacts, run

build/sbt package

To execute tests, run

build/sbt test

To execute a single test suite, run

build/sbt spark/'testOnly org.apache.spark.sql.delta.optimize.OptimizeCompactionSQLSuite'

To execute a single test within and a single test suite, run

build/sbt spark/'testOnly *.OptimizeCompactionSQLSuite -- -z "optimize command: on partitioned table - all partitions"'

Refer to SBT docs for more commands.

IntelliJ Setup

IntelliJ is the recommended IDE to use when developing Delta Lake. To import Delta Lake as a new project:

  1. Clone Delta Lake into, for example, ~/delta.
  2. In IntelliJ, select File > New Project > Project from Existing Sources... and select ~/delta.
  3. Under Import project from external model select sbt. Click Next.
  4. Under Project JDK specify a valid Java 1.8 JDK and opt to use SBT shell for project reload and builds.
  5. Click Finish.

Setup Verification

After waiting for IntelliJ to index, verify your setup by running a test suite in IntelliJ.

  1. Search for and open DeltaLogSuite
  2. Next to the class declaration, right click on the two green arrows and select Run 'DeltaLogSuite'

Troubleshooting

If you see errors of the form

Error:(46, 28) object DeltaSqlBaseParser is not a member of package io.delta.sql.parser
import io.delta.sql.parser.DeltaSqlBaseParser._
...
Error:(91, 22) not found: type DeltaSqlBaseParser
    val parser = new DeltaSqlBaseParser(tokenStream)

then follow these steps:

  1. Compile using the SBT CLI: build/sbt compile.
  2. Go to File > Project Structure... > Modules > delta-spark.
  3. In the right panel under Source Folders remove any target folders, e.g. target/scala-2.12/src_managed/main [generated]
  4. Click Apply and then re-run your test.

License

Apache License 2.0, see LICENSE.

Community

There are two mediums of communication within the Delta Lake community.

delta's People

Contributors

allisonport-db avatar andreaschat-db avatar brkyvz avatar cloud-fan avatar cstavr avatar dhruvarya-db avatar fred-db avatar husseinnagr-db avatar jackierwzhang avatar johanl-db avatar jose-torres avatar kristoffsc avatar larsk-db avatar liwensun avatar longvu-db avatar lzlfred avatar mengtong-db avatar mingdai-db avatar prakharjain09 avatar pranavanand avatar rahulsmahadev avatar ryan-johnson-databricks avatar sabir-akhadov avatar scottsand-db avatar tdas avatar tomvanbussel avatar vkorukanti avatar xupefei avatar yijiacui-db avatar zsxwing avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

delta's Issues

Manifest file for my delta table

Hi, Is there a way that I could create a manifest file for my delta table, to use hive's SymlinkTextInputFormat. I want to query my delta table using presto.

Thanks

Is there any plan for compaction?

Compaction is very important when working with stream jobs. So the delta community have any plan for compaction? The core team will implement it or just wait third-party to contribute?

Can I create hive table on top of delta?

We have a datalake based on lambda architecture to solve real time and batch data sink problem. For which we are using hive as a datastore.

So my question is that, if I use delta, is it possible to create a hive table on top of that?

org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated

Trying to run

./spark-shell --packages io.delta:delta-core_2.11:0.1.0

&

val df = spark.read.format("delta").load(deltaPath)

This error get thrown

java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.delta.sources.DeltaDataSource could not be instantiated
  at java.base/java.util.ServiceLoader.fail(ServiceLoader.java:581)
  at java.base/java.util.ServiceLoader$ProviderImpl.newInstance(ServiceLoader.java:803)
  at java.base/java.util.ServiceLoader$ProviderImpl.get(ServiceLoader.java:721)
  at java.base/java.util.ServiceLoader$3.next(ServiceLoader.java:1394)
  at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:44)
  at scala.collection.Iterator.foreach(Iterator.scala:941)
  at scala.collection.Iterator.foreach$(Iterator.scala:941)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
  at scala.collection.IterableLike.foreach(IterableLike.scala:74)
  at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
  at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:250)
  at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:248)
  at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
  at scala.collection.TraversableLike.filter(TraversableLike.scala:262)
  at scala.collection.TraversableLike.filter$(TraversableLike.scala:262)
  at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
  at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:630)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
  ... 49 elided
Caused by: java.lang.NoClassDefFoundError: org/apache/spark/internal/Logging$class
  at org.apache.spark.sql.delta.sources.DeltaDataSource.<init>(DeltaDataSource.scala:42

Environment

Scala 2.11.12
Spark 2.4.2

Delta Lake is not compatible with pyspark 2.4.3

Hi Delta Lake team,

I tried to setup for Delta Lake by following this instruction and got the same error as described in this issue. I also left comment about the error and environment in that issue.

Today I tried to downgrade my pyspark version to 2.4.2 and it worked:

pyspark --packages io.delta:delta-core_2.12:0.1.0
Python 2.7.16 (default, Apr 12 2019, 15:32:40)
[GCC 4.2.1 Compatible Apple LLVM 10.0.1 (clang-1001.0.46.3)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Ivy Default Cache set to: /Users/xuc/.ivy2/cache
The jars for the packages stored in: /Users/xuc/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python2.7/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bbcee9fd-e9b9-444d-88ac-389b062944c6;1.0
	confs: [default]
	found io.delta#delta-core_2.12;0.1.0 in central
:: resolution report :: resolve 195ms :: artifacts dl 2ms
	:: modules in use:
	io.delta#delta-core_2.12;0.1.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-bbcee9fd-e9b9-444d-88ac-389b062944c6
	confs: [default]
	0 artifacts copied, 1 already retrieved (0kB/6ms)
19/05/31 12:05:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Python version 2.7.16 (default, Apr 12 2019 15:32:40)
SparkSession available as 'spark'.
>>> data = spark.range(5, 10)
>>> data.write.format("delta").mode("overwrite").save("/tmp/delta-table")
>>> exit()

Wonder if there is some compatibility issue?

Useful for distributing a temporal NoSQL storage system?

Hi,

I'm currently thinking about how to best distribute (replicate/partition and query) an Open Source temporal NoSQL storage system (https://sirix.io), which is able to both store XML and JSON in a similar binary encoding and which allows time travel queries through XQuery.

I know Apache Ignite quiet a bit, at least worked with the data grid API. Maybe in the first step simply replicating the data through a single master, which may distribute writes both asynchronous and synchronous to its followers/slaves would be the way to go. I want to preserve ACID transactional semantics through some kind of consensus protocol and the single master. I don't think that a two phase protocol is the best, though (which Ignite seems to use).

But I'm tempted to use Apache Ignite for distributing a transaction log from a single master, though I haven't used Apache Spark or your Delta Lake (which I probably could also use for distributing/executing the queries on each node!?).

I'm currently not sure which system would fit best... Or simply for a start use Zookeeper!?

Kind regards
Johannes

Open Source Compatibility check

Is there an easy way to utility to check if Delta code written on Databricks is Delta Open source compatible.
This can be useful in environments which dev/test in cloud but production needs to be on-premise.
Curious to hear thoughts on this.

KMS encrypted bucket gives 403

KMS encrypted bucket gives 403

  df.write.format("delta").save(s"s3a://$s3BucketName/$tableName")

java.nio.file.AccessDeniedException: s3a://bucket/table/_delta_log: innerMkdirs on s3a://bucket/table/_delta_log: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; 

delta (parquet) format

Hi Delta team,

I tried delta, interesting. I have few questions.

Even though we use "delta" format, its underlying format is "parquet". So is it possible to use this Spark Delta format to read my existing parquet data written without using this Delta.

Why its supporting only delta for parquet? Why not for other spark supported formats? Do you have them in future?

I'm able to read and write data from and to this Delta lake. Is it possible to see that data in Delta lake just like we can see HDFS data from HUE ui.

Many thanks,
Ram

Too frequently delta log scans.

We observed delta logs are scanned many times during the running of a SQL, which submits too many small queries to spark. The main query was even been blocked by waiting for the scheduler being ready. We'd better make the metadata processing (or get current snapshot) more efficient.

support for external table in AWS

Hi,
users building data lake in AWS, or in other systems, use EXTERNAL TABLES quite a lot. This gives their solutions high flexibility. For example having either a personal metastore or using Glue datacatalog.

It will be great if delta support as a storage class was included for HIVE metastores as well. I think we can and do read data using HIVE or SPARK directly from DELTA write locations, but it does give us duplicates, because its just reading the parquet files.

Regards,
Gourav

Document Databricks Delta features not supported in OSS Delta

Explicitly describe those Databricks features that are not supported in OSS Delta. There repeated slack questions about this especially regarding Delta-specific SQL commands.

For example:

  • DESCRIBE HISTORY my_table
  • DESCRIBE DETAIL my_table
  • SELECT * FROM my_table VERSION AS OF 5
  • Merge syntax

SymlinkTextInputFormat Manifest Generation for Presto/Athena read support

Delta's use of MVCC causes external readers (presto, hive, etc) to see inconsistent or duplicate data. The SymlinkTextInputFormat allows these systems to read a set of manifest files, each containing a list of file paths, in order to determine which data files to read (rather that listing the files present on the file system).

This issue tracks adding support for generating these manifest files from the Delta transaction log, automatically after each commit.

This feature would give support for reading Delta tables from Presto. Hive would require some addition work on the Hive side, as Hive does not use the file extension to determine the final InputFormat to use to decode the data (and as such interprets the files incorrectly as text).

Multi threading support on delta table

We faced issue the Parquet table was not compatible with Multi thread processing(Parallel insert in table). Dose delta table is compatible with Multi thread processing(Parallel insert/upsert, delete)?

[Storage System] Support for AWS S3 (single cluster/driver)

This is the official issue for tracking support for AWS S3. Specifically, we want to enable Delta Lake to operate on AWS S3 with transactional guarantees when all writes go through a single Spark driver (that is, it must be a single SparkContext in a single Spark driver JVM process).

The major challenges for operating on S3 with transactional guarantees are as follows:

  1. Lack of atomic "put if not present" - Delta Lake's atomic visibility of transactional changes depends on committing to the transaction log atomically by creating a version file X only if it is not present. S3 file system does not provide a way to perform "put if absent", hence multiple concurrent writers can easily commit the same version file multiple times, thus overwriting another set of changes.

  2. Lack of consistent directory listing - Delta Lake relies on file directory listing to find the latest version file in the transactional log. S3 object listing does not provide the guarantee that listing attempts will return all the files written out in a directory. This, coupled with 1. can further lead to overwriting of the same version.

In this issue, we are going to solve the above problems for a single Spark cluster - if all the concurrent writes to a table go through a single cluster, then we can do the necessary locking and tracking latest version needed to avoid the above issues.

[Concurrency Control] Support concurrent writes

As of 0.2.0, Delta Lake supports concurrent appends, where an append must be only adding new data to the table without reading or modifying existing data in any way.

This issue is to track the support of more types of concurrent writes in Delta Lake, where a writer can also read and modify existing data, such as overwrite, delete or update.

Related issues:
#9: Allow concurrent writes to partitions that don't interact with each other
#23: Multi threading support on delta table

How delta do Garbage collection?

Hi, delta team.

We can run a new job to merge small files in hdfs which generate by our Structured Streaming application. But how can I do Garbage collection with delta?


I find VACUUM command in databricks delta document.
but I can't find any code about Garbage collection in open source version.

It seems I have 76 failure

Hello,

I have 76 failure as the copy / paste :

**[info] Tests: succeeded 71, failed 76, canceled 0, ignored 0, pending 0
[info] *** 76 TESTS FAILED ***
[error] Failed tests:
[error] org.apache.spark.sql.delta.DeltaRetentionSuite
[error] org.apache.spark.sql.delta.DeltaTimeTravelSuite
[error] org.apache.spark.sql.delta.LocalLogStoreSuite
[error] org.apache.spark.sql.delta.AzureLogStoreSuite
[error] org.apache.spark.sql.delta.HDFSLogStoreSuite
[error] org.apache.spark.sql.delta.DeltaSinkSuite
[error] org.apache.spark.sql.delta.DeltaLogSuite
[error] org.apache.spark.sql.delta.DeltaSuite
[error] org.apache.spark.sql.delta.DeltaSourceSuite
[error] org.apache.spark.sql.delta.EvolvabilitySuite
[error] (test:test) sbt.TestsFailedException: Tests unsuccessful
[error] Total time: 409 s, completed 13 juin 2019 à 16:36:45
**

Thank you in advance to help me repair It failures,

Regards.

Dorian ROSSE.

Support user designation of _delta_log location

Hi, this is a feature request to support user designated location of the _delta_log folder. While it is possible to create an external table from the parquet files alone; something like Snowflake/Azure Data Warehouse is not able to understand what the _delta_log folder is an errors out when making an external table. Would it be possible to support the ability to designate the save location of the _delta_logs so that these tools can read the folder while still being able to take advantage of Delta? I understand this would complicate the read, as you would need to pass both the parquet location and the _delta_log folder, but would it be better to be explicit rather than implicit here?

Maybe make the implicit way the default and allow the user the option to explicitly store these elsewhere?

Can we expose REST API on detla tables?

For provide data to down stream, we are using REST API on NoSQL db. Since detla table is supporting ACID, so there is a view to replace NoSQL db with delta table. Can we point existing REST API to delta table?

[DML] Scala/Java APIs for MERGE

This tracks the ongoing work and discussions for building the Scala/Java APIs for SQL MERGE (that is, upserts) on Delta Lake tables.

Allow concurrent writes to partitions that don't interact with each other

I have a use case where I would like to update multiple partitions of data at the same time but the partitions are totally separate and do not interact with each other.

For example, I would like to run these queries concurrently (which would be very useful in the case of backfills):

spark.read.parquet("s3://jim/dataset/v1/valid-records")
  .filter(col("event_date") === lit("2019-01-01"))
  .write
  .partitionBy("event_date")
  .format("delta")
  .mode(SaveMode.Overwrite)
  .option("replaceWhere", "event_date == '2019-01-01'")
  .save("s3://calvin/delta-experiments")
spark.read.parquet("s3://jim/dataset/v1/valid-records")
  .filter(col("event_date") === lit("2019-01-02"))
  .write
  .partitionBy("event_date")
  .format("delta")
  .mode(SaveMode.Overwrite)
  .option("replaceWhere", "event_date == '2019-01-02'")
  .save("s3://calvin/delta-experiments")

So the data above being written as delta belongs to two separate partitions which do not interact with each other. According to the Delta documentation and what I experience is a com.databricks.sql.transaction.tahoe.ProtocolChangedException: The protocol version of the Delta table has been changed by a concurrent update. Please try the operation again.

Would you support this use-case where you can update partitions concurrently that do not interact with each other?

Parquet seems to allow this just fine (without any corruption if you turn on dynamic partitioning with spark.sql.sources.partitionOverwriteMode). This is a very valid use case if you adhere to Maxime Beauchemin's technique of immutable table partitions.

[Storage System] Support for Azure filesystems

This is the official issue for tracking the work for supporting Azure File System.
Delta Lake version (0.1.0) does not support Azure File Systems because the default implementation is based on Hadoop's new FileContext and AbstractFileSystem APIs (gives atomic renames for HDFS) which Azure does not support yet. The plan is to build File System based LogStore implementation.

Plans to support ADLS gen 2 ?

Since ADLS gen 2 provides HDFS compatibility I am thinking it should be possible to use it without the list after write issues related to s3.

S3 support for delta

Databricks managed delta works with s3, Any plan to make it available as open-source?

[DML] Scala/Java API for VACUUM

This tracks the ongoing work and discussions for building the Scala/Java APIs for vacuuming old-version-files and other unnecessary files from the Delta table directory.

[DML] Scala/Java API for OPTIMIZE

This tracks the ongoing work and discussions for building the Scala/Java APIs for optimizing the data layout of Delta tables by coalescing small files into larger files.

saveAsTable doesn't create partitioned data

saveAsTable(...) doesn't layout partitioned data even when save(..) does.

val df = spark.read.format("parquet").load("/data")
df.write.partitionBy("event_month", "event_day").format("delta").option("path", "/delta_as_table").saveAsTable("event")

For saveAsTable(..), directory structure is flat:

delta_as_table/
├── _delta_log/
├── part-0000...parquet
├── part-0001...parquet
├── part-0002...parquet
├── part-0003...parquet

df.write.partitionBy("event_month", "event_day").format("delta").save("/delta")

For save(..) the directory structure is partitioned

delta/
├── _delta_log/
├── event_month=201812
│   ├── event_day=20181201
│   │   ├── part-0000...parquet
│   │	├── ...
│   ├── event_day=20181202
│   │   ├── part-0000...parquet
│   │	├── ...
│   ├── event_day=20181203
│   │   ├── part-0000...parquet
│   │	├── ...

When I try to select non-table data:
spark.read.format("delta").load("/delta").where("event_month=201812 and event_day=20181201").count

Only parquets from directory /delta/event_month=201812/event_day=20181201 are accessed.

When I do the same for table data:
spark.sql("select count(*) from event where event_month=201812 and event_day=20181201").show

All parquets from "delta_as_table/" directory are accessed (based on last read time ls -l --time=atime), which leads me to conclusion partition pruning is not applied.

Computed Partition Column Support

Spark now has the ability to specify partition columns that are computed automatically from other columns in the table using a set of fixed transformations. This mechanism is better than manually performing this computation as it preserves the functional dependency between columns, allowing partition filtering to occur in some cases where query predicates only exist against the original value.

For example: Given a table with a column event_time TIMESTAMP and a partition column date DATE, we could perform partition filtering even when there is only a predicate on date. Without this feature the user would have to manually write redundant predicates against both columns.

This feature can also be used to as the basis to implement bucketing.

Delta support for s3

hi Delta community,

Could we add support for eventual consistent blob storage such as AWS s3?

Thanks

Error while writing data frame using s3a file system.

Hi, I'm trying to use delta lake over s3 storage layer.

Spark command: df.write.format("delta").save("s3a://<bucket_name>/<key>")

Getting this error.

19/06/19 02:13:59 WARN DeltaLog: Failed to parse s3a://demo-atlan-source/delta_test/_delta_log/_last_checkpoint. This may happen if there was an error during read operation, or a file appears to be partial. Sleeping and trying again.
org.apache.hadoop.fs.UnsupportedFileSystemException: No AbstractFileSystem for scheme: s3a
	at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:154)
	at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:242)
	at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
	at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
	at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
	at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
	at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:52)
	at org.apache.spark.sql.delta.storage.HDFSLogStore.read(HDFSLogStore.scala:56)
	at org.apache.spark.sql.delta.Checkpoints$class.loadMetadataFromFile(Checkpoints.scala:138)
	at org.apache.spark.sql.delta.Checkpoints$class.lastCheckpoint(Checkpoints.scala:132)
	at org.apache.spark.sql.delta.DeltaLog.lastCheckpoint(DeltaLog.scala:56)
	at org.apache.spark.sql.delta.DeltaLog.<init>(DeltaLog.scala:133)
	at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1$$anonfun$apply$7.apply(DeltaLog.scala:722)
	at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1$$anonfun$apply$7.apply(DeltaLog.scala:722)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
	at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1.apply(DeltaLog.scala:721)
	at org.apache.spark.sql.delta.DeltaLog$$anon$3$$anonfun$call$1.apply(DeltaLog.scala:721)
	at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
	at org.apache.spark.sql.delta.DeltaLog$.recordOperation(DeltaLog.scala:623)
	at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
	at org.apache.spark.sql.delta.DeltaLog$.recordDeltaOperation(DeltaLog.scala:623)
	at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:720)
	at org.apache.spark.sql.delta.DeltaLog$$anon$3.call(DeltaLog.scala:718)
	at com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4767)
	at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
	at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
	at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
	at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
	at com.google.common.cache.LocalCache.get(LocalCache.java:3965)
	at com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4764)
	at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:718)
	at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:650)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:139)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at CatalogData$.main(CatalogData.scala:26)
	at CatalogData.main(CatalogData.scala)

I'm using the master build of delta-io. Is s3a file system not implemented in delta yet, or am I missing something?

Thanks!

How to process deltas in delta lake?

In databricks I can use MERGE. That doesn't seem supported in the open source version.

MERGE INTO deltatest 
USING tbl_delta 
ON deltatest.PartitionKey IN ('category1', 'category2') 
AND deltatest.uniqueID = tbl_delta.uniqueID 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT *

error:

Py4JJavaError: An error occurred while calling o25.sql.
: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'MERGE' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)

Error writing streaming data to delta lake

I've created my first delta lake table succesfully but i'm having some trouble writing streaming data to it. I generated the table using a set of json files and was testing by copying over one of those files into the streaming directory after creating the file but ran into this error: ERROR:log:An error occurred while calling o326.save.. The full logs are below. I'm running locally on a pseudo-distributed mode with the below setup. Please let me know if any additional details would be helpful!

Java

(mve) [cglaes@blackened : mny : 19:12] java -version
openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
(mve) [cglaes@blackened : mny : 19:13] 

Hadoop

(mve) [cglaes@blackened : mny : 19:10] hadoop version
Hadoop 2.7.6
Subversion https://[email protected]/repos/asf/hadoop.git -r 085099c66cf28be31604560c376fa282e69282b8
Compiled by kshvachk on 2018-04-18T01:33Z
Compiled with protoc 2.5.0
From source with checksum 71e2695531cb3360ab74598755d036
This command was run using /home/cglaes/bin/hadoop/share/hadoop/common/hadoop-common-2.7.6.jar
(mve) [cglaes@blackened : mny : 19:10] 

Spark

(mve) [cglaes@blackened : mny : 19:12] pyspark --version
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/
                        
Using Scala version 2.12.8, OpenJDK 64-Bit Server VM, 1.8.0_191
Branch 
Compiled by user  on 2019-04-18T23:18:13Z
Revision 
Url 
Type --help for more information.
(mve) [cglaes@blackened : mny : 19:12]

Below are the logs from my resource manager

ERROR:log:received empty C json rdd, skipping
INFO:log:transforming C json rdd to DataFrame
INFO:log:writing below streaming forex_quote_log data to delta
+-------+-----------+---------+---------+-------------+----------+
|   pair|exchange_id|ask_price|bid_price|        epoch|      date|
+-------+-----------+---------+---------+-------------+----------+
|HUF/EUR|         48|0.0030891|0.0030825|1557290584000|2019-05-08|
|XAU/JPY|         48| 141550.0| 141449.0|1557290584000|2019-05-08|
|ZAR/BWP|         48|  0.74758|  0.74714|1557290584000|2019-05-08|
|NZD/EUR|         48|  0.58779|  0.58759|1557290587000|2019-05-08|
|HRK/CAD|         48|  0.20356|  0.20353|1557290585000|2019-05-08|
|USD/EUR|         48| 0.892618| 0.892379|1557290585000|2019-05-08|
|BWP/ZAR|         48|   1.3384|   1.3376|1557290584000|2019-05-08|
|TRY/CHF|         48| 0.165202| 0.165084|1557290585000|2019-05-08|
|USD/SGD|         48|  1.36094|  1.36079|1557290587000|2019-05-08|
|NGN/ZAR|         48| 0.040118| 0.040034|1557290586000|2019-05-08|
|NZD/DKK|         48|   4.3885|   4.3868|1557290587000|2019-05-08|
|DKK/EUR|         48|  0.13395|  0.13393|1557290586000|2019-05-08|
|XAG/AUD|         48|   21.338|   21.232|1557290584000|2019-05-08|
|AUD/PLN|         48|  2.68999|  2.68832|1557290586000|2019-05-08|
|ZAR/PKR|         48|   9.8109|   9.8051|1557290584000|2019-05-08|
|THB/SGD|         48| 0.042785| 0.042774|1557290587000|2019-05-08|
|MAD/AUD|         48|  0.14728|  0.14727|1557290585000|2019-05-08|
|JPY/ZAR|         48|  0.13115|  0.13107|1557290584000|2019-05-08|
|DKK/THB|         48|   4.7744|   4.7735|1557290585000|2019-05-08|
|SGD/ZAR|         48|  10.5989|  10.5903|1557290586000|2019-05-08|
+-------+-----------+---------+---------+-------------+----------+
only showing top 20 rows

ERROR:log:An error occurred while calling o326.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 13.0 failed 4 times, most recent failure: Lost task 17.3 in stage 13.0 (TID 188, blackened, executor 1): java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190508044339_155729061998421.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
	at org.apache.spark.sql.delta.Snapshot$.$anonfun$assertLogBelongsToTable$1(Snapshot.scala:216)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:274)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2551)
	at org.apache.spark.sql.Dataset.first(Dataset.scala:2558)
	at org.apache.spark.sql.delta.Snapshot.<init>(Snapshot.scala:141)
	at org.apache.spark.sql.delta.DeltaLog.$anonfun$updateInternal$2(DeltaLog.scala:318)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode(DeltaProgressReporter.scala:30)
	at org.apache.spark.sql.delta.util.DeltaProgressReporter.withStatusCode$(DeltaProgressReporter.scala:25)
	at org.apache.spark.sql.delta.DeltaLog.withStatusCode(DeltaLog.scala:59)
	at org.apache.spark.sql.delta.DeltaLog.$anonfun$updateInternal$1(DeltaLog.scala:251)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:75)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:65)
	at org.apache.spark.sql.delta.DeltaLog.recordOperation(DeltaLog.scala:59)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:105)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
	at org.apache.spark.sql.delta.DeltaLog.recordDeltaOperation(DeltaLog.scala:59)
	at org.apache.spark.sql.delta.DeltaLog.updateInternal(DeltaLog.scala:251)
	at org.apache.spark.sql.delta.DeltaLog.$anonfun$update$1(DeltaLog.scala:211)
	at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
	at org.apache.spark.sql.delta.DeltaLog.update(DeltaLog.scala:211)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommit$1(OptimisticTransaction.scala:323)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:181)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommit(OptimisticTransaction.scala:316)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commit$1(OptimisticTransaction.scala:232)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:75)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:65)
	at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:66)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:105)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
	at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:66)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:218)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:216)
	at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:66)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:72)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:146)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: File (file:/data/stream/polygon/C/C_20190508044339_155729061998421.json) doesn't belong in the transaction log at hdfs://localhost:9000/data/lake/polygon/forex_quote_log/_delta_log. Please contact Databricks Support.
	at org.apache.spark.sql.delta.Snapshot$.$anonfun$assertLogBelongsToTable$1(Snapshot.scala:216)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more

First 10 lines of the file in question

(mve) [cglaes@blackened : mny : 19:17] head -n 10 /data/stream/polygon/C/C_20190508044339_155729061998421.json
{"pair": "HUF/EUR", "exchange_id": 48, "ask_price": 0.0030891, "bid_price": 0.0030825, "epoch": 1557290584000}
{"pair": "XAU/JPY", "exchange_id": 48, "ask_price": 141550, "bid_price": 141449, "epoch": 1557290584000}
{"pair": "ZAR/BWP", "exchange_id": 48, "ask_price": 0.74758, "bid_price": 0.74714, "epoch": 1557290584000}
{"pair": "NZD/EUR", "exchange_id": 48, "ask_price": 0.58779, "bid_price": 0.58759, "epoch": 1557290587000}
{"pair": "HRK/CAD", "exchange_id": 48, "ask_price": 0.20356, "bid_price": 0.20353, "epoch": 1557290585000}
{"pair": "USD/EUR", "exchange_id": 48, "ask_price": 0.892618, "bid_price": 0.892379, "epoch": 1557290585000}
{"pair": "BWP/ZAR", "exchange_id": 48, "ask_price": 1.3384, "bid_price": 1.3376, "epoch": 1557290584000}
{"pair": "TRY/CHF", "exchange_id": 48, "ask_price": 0.165202, "bid_price": 0.165084, "epoch": 1557290585000}
{"pair": "USD/SGD", "exchange_id": 48, "ask_price": 1.36094, "bid_price": 1.36079, "epoch": 1557290587000}
{"pair": "NGN/ZAR", "exchange_id": 48, "ask_price": 0.040118, "bid_price": 0.040034, "epoch": 1557290586000}
(mve) [cglaes@blackened : mny : 19:17] 

Could delta lake work on CDH Spark 2.3?

my cluster using spark 2.3 on CDH 5.13.3, when i try delta on the cluster, it always gets error message like below:

"data.write.format('delta').save('/tmp/delta-table01')"

"py4j.protocol.Py4JJavaError: An error occurred while calling o62.save.
: com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper$"

however, i run same command on standalone mode spark 2.4.2, it works well. so if i want to use delta on CDH cluster, how could i setup the envirorment? thanks.

[Concurrency Control] Support concurrent append-only writes

This is the official issue to track the support of concurrent append-only writes, or appends.

A common workload patten is to have multiple writers concurrently appending data to the table. A writer is considered append-only or an append if they only add data without reading or modifying existing data of the Delta Lake table in any way. Concurrent reads and appends should be allowed to operate on the table partitions and get snapshot isolation.

[Storage System] Support for AWS S3 (multiple clusters/drivers/JVMs)

This is the official for discussing support for Delta Lake on S3 while writing from multiple clusters. The challenges of S3 support have been explained in #39 . While #39 tracks the work for a simpler solution that works only with all write operations going through the same cluster/driver/JVM, this issues tracks the larger problem of making it work with multiple clusters.

Please use this thread to discuss and vote on ideas.

Update 2022-01-13
We have begun working with an open-source contributor on the design + implementation of this feature using DynamoDB to provide the mutual-exclusion that S3 is lacking.

Here's the public design doc.

The current status is:

  • PR feedback document + not-yet-public (still WIP) design doc
  • implement PR feedback
  • refactor base PR to use new storage-dynamodb SBT project
  • refactor base PR's python integration tests cc @allisonport-db
  • refactor to Java
  • refactor-out project to separate module (to isolate AWS dependencies)
  • ease-of-use improvements (e.g. default tables, capacity modes, etc.)
  • potential performance improvements
  • 0th commit DynamoDB-empty-table check

New Delta Lake benchmark for Renaissance

As the people behind the freshly released Renaissance Benchmark Suite (https://renaissance.dev/), the announcement of Delta Lake open sourcing caught our attention. Indeed, a Delta Lake benchmark could be a very interesting addition to Renaissance, since performing many ACID transactions on top of Apache Spark looks like a relevant use case with enough abstraction layers and complexity. Moreover, such a benchmark would drive attention to the framework and would also be used by GC or compiler developers to make sure that they properly deal with this kind of workloads.

We would be very happy if you could contribute to Renaissance with a workload that you feel is important. Adding such a workload should be a very easy task if you have a minimal demo app. It's not more complicated than adding a scala class to our apache-spark subproject and write a runIteration function with the actual workload. See this example of a naive bayes classifier. Of course, we will be happy to help with the integration if necessary.

Issue on our GitHub repo: renaissance-benchmarks/renaissance#126

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.