dask-contrib / dask-deltatable Goto Github PK
View Code? Open in Web Editor NEWA Delta Lake reader for Dask
License: BSD 3-Clause "New" or "Revised" License
A Delta Lake reader for Dask
License: BSD 3-Clause "New" or "Revised" License
In to_deltalake
, there's a kwarg called storage_options
:
storage_options : dict[str, str] | None. Default None
Options passed to the native delta filesystem. Unused if 'filesystem' is defined
In read_deltalake
, we also have a kwarg called storage_options
:
storage_options : dict, default None
Key/value pairs to be passed on to the fsspec backend, if any.
They are not the same options. The same options in read_deltalake
are called delta_storage_options
:
delta_storage_options : dict, default None
Key/value pairs to be passed on to the delta-rs filesystem, if any.
This is confusing. We better align the two APIs.
Create a function to show the history of the delta table using delta logs.
REF
https://docs.databricks.com/delta/delta-utility.html#language-sql
delta-rs already has DeltaTable.history().
Perhaps we should remove history()
from this library and just use what's already built into delta-rs.
When reading data with dask-deltalake and distributed client, an error happens:
<dask.highlevelgraph.HighLevelGraph object at 0x13fac3fd0>
0. read-delta-table-f039f9f4fa4ebf9b2dd57eb38dcfa70e
>.
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 350, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 73, in pickle_dumps
frames[0] = pickle.dumps(
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/jbennet/src/dask-deltatable/t6.py", line 25, in <module>
res = ddf_read.compute()
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 310, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 595, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/client.py", line 3207, in get
futures = self._graph_to_futures(
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/client.py", line 3106, in _graph_to_futures
header, frames = serialize(ToPickle(dsk), on_error="raise")
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 372, in serialize
raise TypeError(msg, str(x)[:10000]) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x13fac3fd0>\n 0. read-delta-table-f039f9f4fa4ebf9b2dd57eb38dcfa70e\n>')
Reproducer:
import os
import shutil
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask_deltatable as ddt
from distributed import Client
if __name__ == "__main__":
df = pd.DataFrame({
"i1": np.random.randint(1, 10000, size=100),
"f1": np.random.random(100),
"s1": np.random.choice(["Apple", "Banana", "Watermelon", "Mango"], size=100),
})
ddf = dd.from_pandas(df, npartitions=10)
data_path = "data/t6_data"
if os.path.exists(data_path):
shutil.rmtree(data_path)
ddt.to_deltalake(data_path, ddf)
client = Client(processes=True)
try:
ddf_read = ddt.read_deltalake(data_path)
res = ddf_read.compute()
print(f"\n{res}")
finally:
client.close()
This library is currently using an old version of deltalake
.
Can we bump from deltalake==0.6.3
to deltalake==0.9.0
?
DeltaLake does not support timestamps other than datetime64[us]
. We can make it easier for the user to write those, however, by converting timestamps with a different resolution to us
. They will not be preserved when reading back, still, it would be a nicer API if users don't have to do the work themselves. We can emit a warning for a dtype not being preserved in this case.
I played around with delta-rs and think that'd be an even easier way to implement the Delta read functionality.
Here's how we can get all the files that need to be read into the Dask DataFrame.
from deltalake import DeltaTable
dt = DeltaTable("tmp/some-delta-pyspark")
dt.files() # list of files
This to_pyarrow_dataset function may be useful. There is also a to_pyarrow_table function.
What do you think @rajagurunath?
We are seeing more folks in the Dask community ask about Delta Lake support. Searching through GitHub there just seems to be this repo and deltadask by @MrPowers.
Given that this repo has the most traction I wondered if you have any interest in migrating this repo to the dask-contrib org on GitHub to incubate the project?
This would allow Dask developers to help you out with maintenance efforts. It also makes it easier for us to promote the project.
What do you think?
Currently in dask-deltatable, we're using pyarrow.dataset.dataset
, which we filter with a pyarrow.Expression
:
dask-deltatable/dask_deltatable/core.py
Line 78 in dbeb8cc
Would the ParquetDataset
be more appropriate here? It can accept filters as Expression
, or tuple
/DNF form, which would allow us to skip that filters_to_expression
step.
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html
It would be good to test reading and writing to remote filesystems, such as AWS S3, Google Cloud Storage, Azure. But we would need to set up buckets / credentials for those. Dask / Coiled team could host those buckets. Alternatively, we can ask the author of dask-deltatable
if they want to manage that.
cc @fjetter , what do you think?
See also #53.
It should be relatively straightforward to build a Dask Delta writer that outputs the JSON metadata that powers Delta.
Building the whole Delta Engine writer is hard, but outputting the JSON file shouldn't be too bad.
I am thinking this will help a lot because Dask's default metadata file is causing problems. We urgently need a solution to help scale Parquet writes better. Let me know if you have some time to take a stab at this!
It's not possible to write a dataframe to Delta using distributed.Client
and custom ParquetFileWriteOptions
.
Reproducible example:
import pandas as pd
import dask.dataframe as dd
from distributed import Client
from dask_deltatable import to_deltalake
from pyarrow.dataset import ParquetFileFormat
if __name__ == "__main__":
client = Client()
data_dir = "delta-compression"
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
write_options = ParquetFileFormat().make_write_options(compression="snappy")
ddf = dd.from_pandas(df, npartitions=2)
to_deltalake(data_dir, ddf, file_options=write_options)
client.close()
Error:
2023-07-13 17:54:14,273 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x14790eed0>
0. from_pandas-22cf234e462885962d4f4fb8d1d989bf
1. _write_partition-7e92bd9d1f919ad9adc30211e323c963
2. delta-commit
>.
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
^^^^^^^^^^^^^^^^^^^^^^^
File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 63, in dumps
result = pickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 68, in dumps
pickler.dump(x)
File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 350, in serialize
header, frames = dumps(x, context=context) if wants_context else dumps(x)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 73, in pickle_dumps
frames[0] = pickle.dumps(
^^^^^^^^^^^^^
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 81, in dumps
result = cloudpickle.dumps(x, **dump_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
return Pickler.dump(self, obj)
^^^^^^^^^^^^^^^^^^^^^^^
File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/jbennet/src/dask-notebooks/delta_parquet_options.py", line 14, in <module>
to_deltalake(data_dir, ddf, file_options=write_options)
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/dask_deltatable/write.py", line 225, in to_deltalake
result = result.compute()
^^^^^^^^^^^^^^^^
File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 372, in serialize
raise TypeError(msg, str(x)[:10000]) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 3 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x14790eed0>\n 0. from_pandas-22cf234e462885962d4f4fb8d1d989bf\n 1. _write_partition-7e92bd9d1f919ad9adc30211e323c963\n 2. delta-commit\n>')
def read_delta_table(self, **kwargs) -> dd.core.DataFrame:
"""
Reads the list of parquet files in parallel
"""
pq_files = self.get_pq_files()
if len(pq_files) == 0:
raise RuntimeError("No Parquet files are available")
parts = [
delayed(
self.read_delta_dataset,
name="read-delta-table-" + tokenize(self.fs_token, f, **kwargs),
)(f, **kwargs)
for f in list(pq_files)
]
meta = self._make_meta_from_schema()
verify_meta = kwargs.get("verify_meta", False)
return from_delayed(parts, meta=meta, verify_meta=verify_meta)
This should probably be replaced with a dict and new_dd_collection
. This will make things more efficient, especially for larger datasets. I'm hopeful that this is an easy task, maybe a ten-line change. Probably it looks something like ...
dsk = {
(name, i): (apply, self.read_delta_dataset, [filename], kwargs)
for i, filename in enumerate(filenames)
}
...
return new_dd_collection(dsk, meta, name, divsions) # ordering here might be wrong
Glad to see this project is still growing and adding more features.
Some high-level thoughts on the dask_deltatable:
DeltaTable.vacuum()
. Is there any reason to have vacuum in this library as well?DeltaTable.history()
. Can we just use what's already built into delta-rs?deltalake==0.6.3
to deltalake==0.8.1
?Some suggested next steps:
Currently s3fs and boto3 are required dependencies of dask_deltatable. Can they be put in an s3
or aws
optional requirements list?
This diff should do the trick
diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py
index f833c16..1fa85ac 100644
--- a/dask_deltatable/core.py
+++ b/dask_deltatable/core.py
@@ -6,7 +6,6 @@ from urllib.parse import urlparse
import dask
import dask.dataframe as dd
import pyarrow.parquet as pq
-from boto3 import Session
from dask.base import tokenize
from dask.dataframe.io import from_delayed
from dask.delayed import delayed
@@ -172,6 +171,8 @@ class DeltaTableWrapper(object):
def _read_from_catalog(
database_name: str, table_name: str, **kwargs
) -> dd.core.DataFrame:
+ from boto3 import Session
+
if ("AWS_ACCESS_KEY_ID" not in os.environ) and (
"AWS_SECRET_ACCESS_KEY" not in os.environ
):
diff --git a/requirements.txt b/requirements.txt
index d12c835..5233228 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,4 @@
dask[dataframe,distribuited]
-boto3
deltalake==0.6.3
fsspec==2022.10.0
pyarrow==10.0.0
-s3fs==2022.10.0
diff --git a/setup.py b/setup.py
index f4428f8..3fe0698 100644
--- a/setup.py
+++ b/setup.py
@@ -5,6 +5,7 @@ from setuptools import setup
with open("README.md", "r", encoding="utf-8") as f:
long_description = f.read()
+
setup(
name="dask-deltatable",
version="0.3rc",
@@ -17,7 +18,10 @@ setup(
long_description_content_type="text/markdown",
python_requires=">=3.7",
install_requires=open("requirements.txt").read().strip().split("\n"),
- extras_require={"dev": ["pytest", "requests", "pytest-cov>=2.10.1"]},
+ extras_require={
+ "dev": ["pytest", "requests", "pytest-cov>=2.10.1"],
+ "s3": ["s3fs==2022.10.0", "boto3"],
+ },
package_data={"dask_deltatable": ["*.pyi" "__init__.pyi", "core.pyi"]},
include_package_data=True,
zip_safe=False,
My sense from reading upstream issues on AWS credentials is that they're not interested in scraping AWS permissions from the various locations that exist (environment variables, config files, API endpoints) and are leaving this to downstream libraries. (example issue: apache/arrow-rs#4238)
We should maybe check to see if the location starts with s3://
and if so open up botocore
and get whatever credentials we need from there and add them appropriately to storage_options
and delta_storage_options
Lots of exciting progress has been made here lately. Do we want to think about doing a release here soon?
I'm following the README instructions to install dask-deltatable
from conda-forge
. It fails with the following error message
UnsatisfiableError: The following specifications were found to be incompatible with each other:
Output in format: Requested package -> Available versionsThe following specifications were found to be incompatible with your system:
- feature:/osx-arm64::__osx==12.7.1=0
- feature:|@/osx-arm64::__osx==12.7.1=0
- dask-deltatable -> pyarrow -> __osx[version='>=10.9']
Your installed version is: 12.7.1
Seems strange since clearly my osx version satisfies the >=10.9
condition. Will investigate further but leaving it here in case others run into the same problem.
I'm running on an M1 Mac with python==3.9
[ EDIT ]
conda
env and running the conda install -c conda-forge dask-deltatable
fails with a similar errorThe initial API for writing Delta Lake is a little bit clunky for the user.
When reading, users have to do something like this:
from dask_deltatalbe import read_delta_table
ddf = read_delta_table("path_to_table")
To write, they need this:
from dask_deltatable.write import to_deltalake
out = to_deltalake("path_to_table", ddf)
out.compute()
TODO:
read_delta_table
vs to_deltalake
. Either of the following combos would be more consistent:
read_delta_table/write_delta_table
read_deltalake/write_deltalake
read_delta_table/to_delta_table
read_deltalake/to_deltalake
to_deltalake
should be exposed on top level, same as read_delta_table
compute
as an extra step, add compute: bool
kwarg insteadWe can't currently write categoricals to DeltaLake.
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask_deltatable.write import to_deltalake
if __name__ == "__main__":
df = pd.DataFrame({
"i1": np.random.randint(1, 10000, size=100),
"f1": np.random.random(100),
"c1": pd.Series(np.random.choice(["Apple", "Banana", "Watermelon", "Mango"], size=100), dtype="category"),
})
ddf = dd.from_pandas(df, npartitions=10)
to_deltalake("t4_data", ddf).compute()
This raises:
Traceback (most recent call last):
File "/Users/jbennet/src/dask-deltatable/t4.py", line 14, in <module>
to_deltalake("t4_data", ddf).compute()
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 310, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 595, in compute
results = schedule(dsk, keys, **kwargs)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/threaded.py", line 89, in get
results = get_async(
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/local.py", line 511, in get_async
raise_exception(exc, tb)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/local.py", line 319, in reraise
raise exc
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/local.py", line 224, in execute_task
result = _execute_task(task, data)
File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/Users/jbennet/src/dask-deltatable/dask_deltatable/write.py", line 180, in _commit
_write_new_deltalake(
Exception: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8)
Pyarrow supports writing categories; delta-rs does not.
Integrate the Delta Acceptance Testing reader tests.
Here are the DAT tests. We can probably just copy them over with minor modifications.
I noticed that the requirements for deltalake, fsspec, pyarrow, and s3fs are pinned exactly. This makes it challenging for users to use dask_deltatable in other environments which might require slightly different versions of those libraries.
Is it possible to relax those pins, or do they need to be pinned exactly?
First of all, thanks for the effort.
I am evaluating Dask for my scientific computation which reads delta table and writes results as delta table. The readme says there is a limitation while writing but unclear on what it is. Can you please share some details?
Note - I also saw this page where they explain using Pandas but unsure if I can use write_deltalake
in Dask.
The last example in README says:
ddt.to_deltalake(df, "s3://bucket_name/delta_path")
However, this does not seem compatible with the latest version of dask-deltatable. It should be instead:
ddt.to_deltalake("s3://bucket_name/delta_path", df)
or to keep the initial order of arguments:
ddt.to_deltalake(df=df, table_or_uri="s3://bucket_name/delta_path")
We need end-to end test cases that write to remote FS: s3, gcsfs, etc. Make sure this is done with distributed scheduler also.
I have a delta dataset that's partitioned by a couple of columns. Currently, if a user uses filter
to select data from a subset of those columns, dask-deltatable
will still make a dask dataframe partition for each file in the original dataset. Here's an example (this should be runnable, as long as you have requests and adlfs
)
import os
import dask_deltatable
import requests
token = requests.get("https://planetarycomputer.microsoft.com/api/sas/v1/token/ms-buildings").json()["token"]
storage_options = {"account_name": "bingmlbuildings", "sas_token": token}
df = dask_deltatable.read_delta_table(
"az://footprints/delta/2023-04-25/ml-buildings.parquet",
storage_options=storage_options, delta_storage_options=storage_options,
filter=[("RegionName", "=", "Turkey")]
)
print(df)
Which prints out
Dask DataFrame Structure:
geometry meanHeight RegionName quadkey
npartitions=26028
object float32 object int32
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: from-delayed, 26029 graph layers
It seems that deltable.file_uris
can take the filter
argument and correctly filter out partitions that don't match the filter. With a small change to dask-deltatable, we can forward that from the user to deltatable
:
diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py
index 383ded4..c22ddca 100644
--- a/dask_deltatable/core.py
+++ b/dask_deltatable/core.py
@@ -6,7 +6,7 @@ from urllib.parse import urlparse
import dask
import dask.dataframe as dd
import pyarrow.parquet as pq
-from boto3 import Session
+# from boto3 import Session
from dask.base import tokenize
from dask.dataframe.io import from_delayed
from dask.delayed import delayed
@@ -145,7 +145,7 @@ class DeltaTableWrapper(object):
]
dask.compute(parts)[0]
- def get_pq_files(self) -> List[str]:
+ def get_pq_files(self, partition_filters=None) -> List[str]:
"""
get the list of parquet files after loading the
current datetime version
@@ -154,13 +154,14 @@ class DeltaTableWrapper(object):
if self.datetime is not None:
self.dt.load_with_datetime(self.datetime)
- return self.dt.file_uris()
+ return self.dt.file_uris(partition_filters=partition_filters)
def read_delta_table(self, **kwargs) -> dd.core.DataFrame:
"""
Reads the list of parquet files in parallel
"""
- pq_files = self.get_pq_files()
+ filter = kwargs.get("filter")
+ pq_files = self.get_pq_files(partition_filters=filter)
if len(pq_files) == 0:
raise RuntimeError("No Parquet files are available")
parts = [
Now if I run that, you'll see the Dask DataFrame only has 266 partitions.
Dask DataFrame Structure:
geometry meanHeight RegionName quadkey
npartitions=266
object float32 object int32
... ... ... ...
... ... ... ... ...
... ... ... ...
... ... ... ...
Dask Name: from-delayed, 267 graph layers
I'll make a PR with this shortly.
I went to use mode="overwrite"
with to_deltalake
and saw that it isn't implemented quite yet
dask-deltatable/dask_deltatable/write.py
Lines 187 to 189 in cbe085a
Looking at the comment there, I'm not sure exactly what additional checks are still needed.
It would be nice to have this package on conda-forge as well
To provide more flexibility when reading data, particularly for datatype that are not native to python, yet (e.g. Decimals) it would be helpful to support the pyarrow types_mapper kwarg to grant users more control over how they want their data to be read.
In dask-deltatable, when calling dd.read_parquet, perhaps we can reuse the metadata already preserved in delta json, instead of collecting it from parquet files all over again.
Here:
dask-deltatable/dask_deltatable/core.py
Line 196 in cd731a9
It looks like dd.read_parquet
will have to go through the parquet files to read the metadata, but the DeltaTable
should have all that info already.
I did a quick test reading DeltaLake
data in a notebook on a Coiled cluster from s3, with dd.read_parquet
vs ddt.read_deltalake
.
Cluster: https://cloud.coiled.io/clusters/245026/information?account=dask-engineering.
Data is located in s3://coiled-datasets/delta/
.
Results:
dataset | computation | timing (read_parquet) | timing (read_deltalake) |
---|---|---|---|
ds20f_100M | ddf["int1"].sum().compute() | CPU times: user 43.5 ms, sys: 10.7 ms, total: 54.2 ms, Wall time: 8.04 s | CPU times: user 159 ms, sys: 38.4 ms, total: 198 ms, Wall time: 55.3 s |
ds20f_100M | ddf.describe().compute() | CPU times: user 256 ms, sys: 28.7 ms, total: 284 ms, Wall time: 20.7 s | CPU times: user 380 ms, sys: 60.7 ms, total: 441 ms, Wall time: 1min 10s |
ds25f_250M | ddf["int1"].sum().compute() | CPU times: user 67.1 ms, sys: 15.6 ms, total: 82.7 ms, Wall time: 16.7 s | CPU times: user 666 ms, sys: 176 ms, total: 842 ms, Wall time: 3min 59s |
ds25f_250M | ddf.describe().compute() | CPU times: user 605 ms, sys: 70.3 ms, total: 675 ms, Wall time: 1min 10s | CPU times: user 1.02 s, sys: 181 ms, total: 1.2 s, Wall time: 4min 2s |
ds50f_500M | ddf["int1"].sum().compute() | CPU times: user 204 ms, sys: 49.2 ms, total: 253 ms, Wall time: 1min 2s | CPU times: user 2.93 s, sys: 626 ms, total: 3.56 s, Wall time: 16min 46s |
ds50f_500M | ddf.describe().compute() | CPU times: user 3.59 s, sys: 383 ms, total: 3.97 s, Wall time: 5min 53s | killed before finished |
This doesn't look good, and needs looking into.
I saw this example and thought it might be a better way to load data.
Right now it looks like you rely on being able to just read from the Parquet files and load the partition values from HIVE-style directories. This isn't robust in two ways:
In the future, it would be best not to rely on reading from the file URIs and instead read from the dataset fragments, which will provide the correct data as the Delta Protocol continues to evolve.
To reproduce:
mamba install python==3.11 pip
pip install deltalake==0.14
pip install dask-deltatable
import dask_deltatable
ImportError: cannot import name '_write_new_deltalake' from 'deltalake.writer' (/Users/rpelgrim/miniforge3/envs/deltalake-0130/lib/python3.11/site-packages/deltalake/writer.py)
To solve:
pip install deltalake==0.13
I've stored a bunch of data partitioned by date, and written it to delta using the deltalake package like so:
for df in dfs:
write_deltalake("mytable", df, partition_by="date")
(although actually this was done in parallel, and so things maybe got written out of order
├── date=2024-01-01
│ ├── part-00001-5c3d1646-6a8b-4511-87f5-3cd0acf1c0e8-c000.zstd.parquet
│ ├── part-00001-869909e1-1079-49db-83d6-77a43d67370a-c000.zstd.parquet
│ └── part-00001-eeb823e8-b9ed-49bd-86d9-9f28f0f444b7-c000.zstd.parquet
└── date=2024-01-02
├── part-00001-32e6c973-6d2a-4132-82dd-e6b431cf5343-c000.zstd.parquet
├── part-00001-426c2af2-6e86-4cfe-86c9-853f243c35e6-c000.zstd.parquet
├── part-00001-6fa10ba1-6b14-4908-b328-f8a8fdaec258-c000.zstd.parquet
└── part-00001-79cd49d4-dee1-4957-8cdf-86b6f86f95f6-c000.zstd.parquet
When I go to read it I find that the data isn't sorted by partition
df = ddt.read_deltalake("mytable")
df.date.head()
0 2024-01-02
1 2024-01-02
2 2024-01-02
3 2024-01-02
4 2024-01-02
Name: date, dtype: date32[day][pyarrow]
df.date.tail()
1392523 2024-01-01
1392524 2024-01-01
1392525 2024-01-01
1392526 2024-01-01
1392527 2024-01-01
Name: date, dtype: date32[day][pyarrow]
We should order things if we can I think. I propose the following:
"stats"
attribute of the deltalake metadataProbably both the effort and uncertainty increase as we go down that list. The first item seems pretty straightforward to me though.
When using the deltalake=0.16.0
release (released yesterday), I get the following error when attempting to import dask_deltatable
Traceback (most recent call last):
File "/Users/james/projects/coiled/etl-tpch/workflow.py", line 8, in <module>
from pipeline.reduce import query_reduce
File "/Users/james/projects/coiled/etl-tpch/pipeline/reduce.py", line 6, in <module>
import dask_deltatable as ddt
File "/Users/james/mambaforge/envs/etl-tpch/lib/python3.11/site-packages/dask_deltatable/__init__.py", line 9, in <module>
from .write import to_deltalake as to_deltalake
File "/Users/james/mambaforge/envs/etl-tpch/lib/python3.11/site-packages/dask_deltatable/write.py", line 18, in <module>
from deltalake.writer import (
ImportError: cannot import name 'MAX_SUPPORTED_WRITER_VERSION' from 'deltalake.writer' (/Users/james/mambaforge/envs/etl-tpch/lib/python3.11/site-packages/deltalake/writer.py)
Hey folks! Amazing work on the query planning functionality with dask-expr
:)
I was just running some deltalake
queries and it seems like this upgrade is interfering with read_deltalake
I'm getting a NotImplementedError: dask_expr does not support a token argument.
when running the code snippet below:
from deltalake import write_deltalake
import dask_deltatable as ddt
import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame({
"group": [1, 1, 2, 2, 3, 3, 4, 4],
"num": list(range(8)),
"letter": ["a", "b", "c", "d", "e", "f", "g", "h"],
})
write_deltalake("some-table", df, partition_by="group")
delta_path = "some-table"
ddf = ddt.read_deltalake(delta_path)
Dask==2024.2.1
import dask
dask.config.set({'dataframe.query-planning': False})
EDIT
I've tried upgrading dask-expr
as suggested here. This upgrades Dask to 2024.4.1 but does not fix the issue for me.
Implement writing and reading back Dask dataframe index.
Right now, after reading back, index will become just another column:
from dask.datasets import timeseries
from dask_deltatable import read_delta_table
from dask_deltatable.write import to_deltalake
if __name__ == "__main__":
ddf = timeseries("2023-07-10", "2023-07-12", freq="1H")
print(f"\nOriginal:\n{ddf.head()}")
ddf.index = ddf.index.astype("datetime64[us]")
to_deltalake("t2_data", ddf).compute()
ddf2 = read_delta_table("t2_data")
print(f"\nRead back:\n{ddf2.head()}")
Output:
Original:
name id x y
timestamp
2023-07-10 00:00:00 Patricia 989 0.748295 0.513837
2023-07-10 01:00:00 Jerry 993 0.487847 -0.493534
2023-07-10 02:00:00 Victor 1035 0.851038 -0.941464
2023-07-10 03:00:00 Sarah 966 0.878928 -0.087768
2023-07-10 04:00:00 Quinn 983 0.712967 -0.977796
Read back:
name id x y timestamp
0 Patricia 989 0.748295 0.513837 2023-07-10 00:00:00
1 Jerry 993 0.487847 -0.493534 2023-07-10 01:00:00
2 Victor 1035 0.851038 -0.941464 2023-07-10 02:00:00
3 Sarah 966 0.878928 -0.087768 2023-07-10 03:00:00
4 Quinn 983 0.712967 -0.977796 2023-07-10 04:00:00
Original PR: #29.
delta-rs already has DeltaTable.vacuum(). We should consider removing vacuum
from this library if it's not needed.
The first version of writing into deltalake did not implement overwriting an existing table. Currently, this is raising a ValueError
(should actually raise a NotImplementedError
):
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask_deltatable.write import to_deltalake
if __name__ == "__main__":
df = pd.DataFrame({
"i1": np.random.randint(1, 10000, size=100),
"f1": np.random.random(100),
"s1": np.random.choice(["Apple", "Banana", "Watermelon", "Mango"], size=100),
})
ddf = dd.from_pandas(df, npartitions=10)
to_deltalake("t1_data", ddf, mode="overwrite").compute()
Raises:
Traceback (most recent call last):
File "/Users/jbennet/src/dask-deltatable/t1.py", line 14, in <module>
to_deltalake("t1_data", ddf, mode="overwrite").compute()
File "/Users/jbennet/src/dask-deltatable/dask_deltatable/write.py", line 82, in to_deltalake
raise ValueError(
ValueError: Schema of data does not match table schema
Table schema:
None
Data Schema:
i1: int64
f1: double
s1: string
Original PR: #29.
Make sure that to_deltalake
works with different partition_freq
, add test cases to test_roundtrip
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.