Giter Site home page Giter Site logo

dask-deltatable's People

Contributors

avriiil avatar charlesbluca avatar fjetter avatar graingert avatar itsolgood avatar j-bennet avatar jacobtomlinson avatar jrbourbeau avatar milesgranger avatar mrocklin avatar rajagurunath avatar tomaugspurger avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

dask-deltatable's Issues

`storage_options` inconsistency between `read_deltalake` and `to_deltalake`

In to_deltalake, there's a kwarg called storage_options:

    storage_options : dict[str, str] | None. Default None
        Options passed to the native delta filesystem. Unused if 'filesystem' is defined

In read_deltalake, we also have a kwarg called storage_options:

    storage_options : dict, default None
        Key/value pairs to be passed on to the fsspec backend, if any.

They are not the same options. The same options in read_deltalake are called delta_storage_options:

    delta_storage_options : dict, default None
        Key/value pairs to be passed on to the delta-rs filesystem, if any.

This is confusing. We better align the two APIs.

`TypeError`: cannot pickle `builtins.RawDeltaTable` object

When reading data with dask-deltalake and distributed client, an error happens:

<dask.highlevelgraph.HighLevelGraph object at 0x13fac3fd0>
 0. read-delta-table-f039f9f4fa4ebf9b2dd57eb38dcfa70e
>.
Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle 'builtins.RawDeltaTable' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
TypeError: cannot pickle 'builtins.RawDeltaTable' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'builtins.RawDeltaTable' object
Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
TypeError: cannot pickle 'builtins.RawDeltaTable' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
TypeError: cannot pickle 'builtins.RawDeltaTable' object

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 350, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 73, in pickle_dumps
    frames[0] = pickle.dumps(
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'builtins.RawDeltaTable' object

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/jbennet/src/dask-deltatable/t6.py", line 25, in <module>
    res = ddf_read.compute()
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 310, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 595, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/client.py", line 3207, in get
    futures = self._graph_to_futures(
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/client.py", line 3106, in _graph_to_futures
    header, frames = serialize(ToPickle(dsk), on_error="raise")
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/distributed/protocol/serialize.py", line 372, in serialize
    raise TypeError(msg, str(x)[:10000]) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 1 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x13fac3fd0>\n 0. read-delta-table-f039f9f4fa4ebf9b2dd57eb38dcfa70e\n>')

Reproducer:

import os
import shutil
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask_deltatable as ddt
from distributed import Client


if __name__ == "__main__":
    df = pd.DataFrame({
        "i1": np.random.randint(1, 10000, size=100),
        "f1": np.random.random(100),
        "s1": np.random.choice(["Apple", "Banana", "Watermelon", "Mango"], size=100),
    })
    ddf = dd.from_pandas(df, npartitions=10)
    data_path = "data/t6_data"
    if os.path.exists(data_path):
        shutil.rmtree(data_path)
    ddt.to_deltalake(data_path, ddf)

    client = Client(processes=True)
    try:
        ddf_read = ddt.read_deltalake(data_path)
        res = ddf_read.compute()
        print(f"\n{res}")
    finally:
        client.close()

Bump deltalake version

This library is currently using an old version of deltalake.

Can we bump from deltalake==0.6.3 to deltalake==0.9.0?

Handle timestamps other than `datetime64[us]`

DeltaLake does not support timestamps other than datetime64[us]. We can make it easier for the user to write those, however, by converting timestamps with a different resolution to us. They will not be preserved when reading back, still, it would be a nicer API if users don't have to do the work themselves. We can emit a warning for a dtype not being preserved in this case.

Try implementing this lib with delta-rs

I played around with delta-rs and think that'd be an even easier way to implement the Delta read functionality.

Here's how we can get all the files that need to be read into the Dask DataFrame.

from deltalake import DeltaTable

dt = DeltaTable("tmp/some-delta-pyspark")
dt.files() # list of files

This to_pyarrow_dataset function may be useful. There is also a to_pyarrow_table function.

What do you think @rajagurunath?

Migrate to dask-contrib?

We are seeing more folks in the Dask community ask about Delta Lake support. Searching through GitHub there just seems to be this repo and deltadask by @MrPowers.

Given that this repo has the most traction I wondered if you have any interest in migrating this repo to the dask-contrib org on GitHub to incubate the project?

This would allow Dask developers to help you out with maintenance efforts. It also makes it easier for us to promote the project.

What do you think?

Can we get rid of `filters_to_expression`?

Currently in dask-deltatable, we're using pyarrow.dataset.dataset, which we filter with a pyarrow.Expression:

.to_table(filter=filter_expression, columns=self.columns)

Would the ParquetDataset be more appropriate here? It can accept filters as Expression, or tuple/DNF form, which would allow us to skip that filters_to_expression step.

https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetDataset.html

Credentials for remote filesystems?

It would be good to test reading and writing to remote filesystems, such as AWS S3, Google Cloud Storage, Azure. But we would need to set up buckets / credentials for those. Dask / Coiled team could host those buckets. Alternatively, we can ask the author of dask-deltatable if they want to manage that.

cc @fjetter , what do you think?

See also #53.

Create Dask Delta writer

It should be relatively straightforward to build a Dask Delta writer that outputs the JSON metadata that powers Delta.

Building the whole Delta Engine writer is hard, but outputting the JSON file shouldn't be too bad.

I am thinking this will help a lot because Dask's default metadata file is causing problems. We urgently need a solution to help scale Parquet writes better. Let me know if you have some time to take a stab at this!

Pickle error with `ParquetFileWriteOptions` and `distributed.Client`

It's not possible to write a dataframe to Delta using distributed.Client and custom ParquetFileWriteOptions.

Reproducible example:

import pandas as pd
import dask.dataframe as dd
from distributed import Client
from dask_deltatable import to_deltalake
from pyarrow.dataset import ParquetFileFormat


if __name__ == "__main__":
    client = Client()
    data_dir = "delta-compression"
    df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
    write_options = ParquetFileFormat().make_write_options(compression="snappy")
    ddf = dd.from_pandas(df, npartitions=2)
    to_deltalake(data_dir, ddf, file_options=write_options)
    client.close()

Error:

2023-07-13 17:54:14,273 - distributed.protocol.pickle - ERROR - Failed to serialize <ToPickle: HighLevelGraph with 3 layers.
<dask.highlevelgraph.HighLevelGraph object at 0x14790eed0>
 0. from_pandas-22cf234e462885962d4f4fb8d1d989bf
 1. _write_partition-7e92bd9d1f919ad9adc30211e323c963
 2. delta-commit
>.
Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
  File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling
Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 63, in dumps
    result = pickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 68, in dumps
    pickler.dump(x)
  File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 350, in serialize
    header, frames = dumps(x, context=context) if wants_context else dumps(x)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 73, in pickle_dumps
    frames[0] = pickle.dumps(
                ^^^^^^^^^^^^^
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/pickle.py", line 81, in dumps
    result = cloudpickle.dumps(x, **dump_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/cloudpickle/cloudpickle_fast.py", line 632, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "stringsource", line 2, in pyarrow._dataset_parquet.ParquetFileWriteOptions.__reduce_cython__
TypeError: self.c_options,self.parquet_options,self.wrapped cannot be converted to a Python object for pickling

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/jbennet/src/dask-notebooks/delta_parquet_options.py", line 14, in <module>
    to_deltalake(data_dir, ddf, file_options=write_options)
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/dask_deltatable/write.py", line 225, in to_deltalake
    result = result.compute()
             ^^^^^^^^^^^^^^^^
  File "/Users/jbennet/mambaforge/envs/notebook-delta/lib/python3.11/site-packages/distributed/protocol/serialize.py", line 372, in serialize
    raise TypeError(msg, str(x)[:10000]) from exc
TypeError: ('Could not serialize object of type HighLevelGraph', '<ToPickle: HighLevelGraph with 3 layers.\n<dask.highlevelgraph.HighLevelGraph object at 0x14790eed0>\n 0. from_pandas-22cf234e462885962d4f4fb8d1d989bf\n 1. _write_partition-7e92bd9d1f919ad9adc30211e323c963\n 2. delta-commit\n>')

Avoid using delayed when making dask dataframe

    def read_delta_table(self, **kwargs) -> dd.core.DataFrame:
        """
        Reads the list of parquet files in parallel
        """
        pq_files = self.get_pq_files()
        if len(pq_files) == 0:
            raise RuntimeError("No Parquet files are available")
        parts = [
            delayed(
                self.read_delta_dataset,
                name="read-delta-table-" + tokenize(self.fs_token, f, **kwargs),
            )(f, **kwargs)
            for f in list(pq_files)
        ]
        meta = self._make_meta_from_schema()
        verify_meta = kwargs.get("verify_meta", False)
        return from_delayed(parts, meta=meta, verify_meta=verify_meta)

This should probably be replaced with a dict and new_dd_collection. This will make things more efficient, especially for larger datasets. I'm hopeful that this is an easy task, maybe a ten-line change. Probably it looks something like ...

dsk = {
    (name, i): (apply, self.read_delta_dataset, [filename], kwargs)
    for i, filename in enumerate(filenames)
}
...
return new_dd_collection(dsk, meta, name, divsions)  # ordering here might be wrong

Code review comments & next steps for this project

Glad to see this project is still growing and adding more features.

Some high-level thoughts on the dask_deltatable:

Some suggested next steps:

Make s3fs / boto optional

Currently s3fs and boto3 are required dependencies of dask_deltatable. Can they be put in an s3 or aws optional requirements list?

This diff should do the trick

diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py
index f833c16..1fa85ac 100644
--- a/dask_deltatable/core.py
+++ b/dask_deltatable/core.py
@@ -6,7 +6,6 @@ from urllib.parse import urlparse
 import dask
 import dask.dataframe as dd
 import pyarrow.parquet as pq
-from boto3 import Session
 from dask.base import tokenize
 from dask.dataframe.io import from_delayed
 from dask.delayed import delayed
@@ -172,6 +171,8 @@ class DeltaTableWrapper(object):
 def _read_from_catalog(
     database_name: str, table_name: str, **kwargs
 ) -> dd.core.DataFrame:
+    from boto3 import Session
+
     if ("AWS_ACCESS_KEY_ID" not in os.environ) and (
         "AWS_SECRET_ACCESS_KEY" not in os.environ
     ):
diff --git a/requirements.txt b/requirements.txt
index d12c835..5233228 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,4 @@
 dask[dataframe,distribuited]
-boto3
 deltalake==0.6.3
 fsspec==2022.10.0
 pyarrow==10.0.0
-s3fs==2022.10.0
diff --git a/setup.py b/setup.py
index f4428f8..3fe0698 100644
--- a/setup.py
+++ b/setup.py
@@ -5,6 +5,7 @@ from setuptools import setup
 with open("README.md", "r", encoding="utf-8") as f:
     long_description = f.read()
 
+
 setup(
     name="dask-deltatable",
     version="0.3rc",
@@ -17,7 +18,10 @@ setup(
     long_description_content_type="text/markdown",
     python_requires=">=3.7",
     install_requires=open("requirements.txt").read().strip().split("\n"),
-    extras_require={"dev": ["pytest", "requests", "pytest-cov>=2.10.1"]},
+    extras_require={
+        "dev": ["pytest", "requests", "pytest-cov>=2.10.1"],
+        "s3": ["s3fs==2022.10.0", "boto3"],
+    },
     package_data={"dask_deltatable": ["*.pyi" "__init__.pyi", "core.pyi"]},
     include_package_data=True,
     zip_safe=False,

Specify AWS Permissions if reading from S3

My sense from reading upstream issues on AWS credentials is that they're not interested in scraping AWS permissions from the various locations that exist (environment variables, config files, API endpoints) and are leaving this to downstream libraries. (example issue: apache/arrow-rs#4238)

We should maybe check to see if the location starts with s3:// and if so open up botocore and get whatever credentials we need from there and add them appropriately to storage_options and delta_storage_options

Release soon?

Lots of exciting progress has been made here lately. Do we want to think about doing a release here soon?

Problem with `pyarrow` dependency when installing dask-deltatable

I'm following the README instructions to install dask-deltatable from conda-forge. It fails with the following error message

UnsatisfiableError: The following specifications were found to be incompatible with each other:

Output in format: Requested package -> Available versionsThe following specifications were found to be incompatible with your system:

  - feature:/osx-arm64::__osx==12.7.1=0
  - feature:|@/osx-arm64::__osx==12.7.1=0
  - dask-deltatable -> pyarrow -> __osx[version='>=10.9']

Your installed version is: 12.7.1

Seems strange since clearly my osx version satisfies the >=10.9 condition. Will investigate further but leaving it here in case others run into the same problem.

I'm running on an M1 Mac with python==3.9

[ EDIT ]

  • creating a clean conda env and running the conda install -c conda-forge dask-deltatable fails with a similar error

Finalize API for writing Delta Tables

The initial API for writing Delta Lake is a little bit clunky for the user.

When reading, users have to do something like this:

from dask_deltatalbe import read_delta_table
ddf = read_delta_table("path_to_table")

To write, they need this:

from dask_deltatable.write import to_deltalake
out = to_deltalake("path_to_table", ddf)
out.compute()

TODO:

  • naming is not consistent; read_delta_table vs to_deltalake. Either of the following combos would be more consistent:
    1. read_delta_table/write_delta_table
    2. read_deltalake/write_deltalake
    3. read_delta_table/to_delta_table
    4. read_deltalake/to_deltalake
  • to_deltalake should be exposed on top level, same as read_delta_table
  • user shouldn't need to call compute as an extra step, add compute: bool kwarg instead
  • #17
  • #16

Implement writing with categoricals

We can't currently write categoricals to DeltaLake.

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask_deltatable.write import to_deltalake


if __name__ == "__main__":
    df = pd.DataFrame({
        "i1": np.random.randint(1, 10000, size=100),
        "f1": np.random.random(100),
        "c1": pd.Series(np.random.choice(["Apple", "Banana", "Watermelon", "Mango"], size=100), dtype="category"),
    })
    ddf = dd.from_pandas(df, npartitions=10)
    to_deltalake("t4_data", ddf).compute()

This raises:

Traceback (most recent call last):
  File "/Users/jbennet/src/dask-deltatable/t4.py", line 14, in <module>
    to_deltalake("t4_data", ddf).compute()
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 310, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/base.py", line 595, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/local.py", line 319, in reraise
    raise exc
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/Users/jbennet/mambaforge/envs/dask-deltatable/lib/python3.9/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/Users/jbennet/src/dask-deltatable/dask_deltatable/write.py", line 180, in _commit
    _write_new_deltalake(
Exception: Schema error: Invalid data type for Delta Lake: Dictionary(Int8, Utf8)

Pyarrow supports writing categories; delta-rs does not.

Relax requirements pins?

I noticed that the requirements for deltalake, fsspec, pyarrow, and s3fs are pinned exactly. This makes it challenging for users to use dask_deltatable in other environments which might require slightly different versions of those libraries.

Is it possible to relax those pins, or do they need to be pinned exactly?

What are the limitations of to_deltalake?

First of all, thanks for the effort.

I am evaluating Dask for my scientific computation which reads delta table and writes results as delta table. The readme says there is a limitation while writing but unclear on what it is. Can you please share some details?

Note - I also saw this page where they explain using Pandas but unsure if I can use write_deltalake in Dask.

Example in Readme not reproducible

The last example in README says:

ddt.to_deltalake(df, "s3://bucket_name/delta_path")

However, this does not seem compatible with the latest version of dask-deltatable. It should be instead:

ddt.to_deltalake("s3://bucket_name/delta_path", df)

or to keep the initial order of arguments:

ddt.to_deltalake(df=df, table_or_uri="s3://bucket_name/delta_path")

Pass `filter` to `file_uris` to skip partitions

I have a delta dataset that's partitioned by a couple of columns. Currently, if a user uses filter to select data from a subset of those columns, dask-deltatable will still make a dask dataframe partition for each file in the original dataset. Here's an example (this should be runnable, as long as you have requests and adlfs)

import os
import dask_deltatable
import requests

token = requests.get("https://planetarycomputer.microsoft.com/api/sas/v1/token/ms-buildings").json()["token"]
storage_options = {"account_name": "bingmlbuildings", "sas_token": token}

df = dask_deltatable.read_delta_table(
    "az://footprints/delta/2023-04-25/ml-buildings.parquet",
    storage_options=storage_options, delta_storage_options=storage_options,
    filter=[("RegionName", "=", "Turkey")]
)

print(df)

Which prints out

Dask DataFrame Structure:
                  geometry meanHeight RegionName quadkey
npartitions=26028
                    object    float32     object   int32
                       ...        ...        ...     ...
...                    ...        ...        ...     ...
                       ...        ...        ...     ...
                       ...        ...        ...     ...
Dask Name: from-delayed, 26029 graph layers

It seems that deltable.file_uris can take the filter argument and correctly filter out partitions that don't match the filter. With a small change to dask-deltatable, we can forward that from the user to deltatable:

diff --git a/dask_deltatable/core.py b/dask_deltatable/core.py
index 383ded4..c22ddca 100644
--- a/dask_deltatable/core.py
+++ b/dask_deltatable/core.py
@@ -6,7 +6,7 @@ from urllib.parse import urlparse
 import dask
 import dask.dataframe as dd
 import pyarrow.parquet as pq
-from boto3 import Session
+# from boto3 import Session
 from dask.base import tokenize
 from dask.dataframe.io import from_delayed
 from dask.delayed import delayed
@@ -145,7 +145,7 @@ class DeltaTableWrapper(object):
             ]
         dask.compute(parts)[0]
 
-    def get_pq_files(self) -> List[str]:
+    def get_pq_files(self, partition_filters=None) -> List[str]:
         """
         get the list of parquet files after loading the
         current datetime version
@@ -154,13 +154,14 @@ class DeltaTableWrapper(object):
 
         if self.datetime is not None:
             self.dt.load_with_datetime(self.datetime)
-        return self.dt.file_uris()
+        return self.dt.file_uris(partition_filters=partition_filters)
 
     def read_delta_table(self, **kwargs) -> dd.core.DataFrame:
         """
         Reads the list of parquet files in parallel
         """
-        pq_files = self.get_pq_files()
+        filter = kwargs.get("filter")
+        pq_files = self.get_pq_files(partition_filters=filter)
         if len(pq_files) == 0:
             raise RuntimeError("No Parquet files are available")
         parts = [

Now if I run that, you'll see the Dask DataFrame only has 266 partitions.

Dask DataFrame Structure:
                geometry meanHeight RegionName quadkey
npartitions=266
                  object    float32     object   int32
                     ...        ...        ...     ...
...                  ...        ...        ...     ...
                     ...        ...        ...     ...
                     ...        ...        ...     ...
Dask Name: from-delayed, 267 graph layers

I'll make a PR with this shortly.

Overwriting tables

I went to use mode="overwrite" with to_deltalake and saw that it isn't implemented quite yet

if mode == "overwrite":
# FIXME: There are a couple of checks that are not migrated yet
raise NotImplementedError("mode='overwrite' is not implemented")

Looking at the comment there, I'm not sure exactly what additional checks are still needed.

Support pyarrow types_mapper kwarg

To provide more flexibility when reading data, particularly for datatype that are not native to python, yet (e.g. Decimals) it would be helpful to support the pyarrow types_mapper kwarg to grant users more control over how they want their data to be read.

Reuse metadata from deltalake when reading parquet

In dask-deltatable, when calling dd.read_parquet, perhaps we can reuse the metadata already preserved in delta json, instead of collecting it from parquet files all over again.

Here:

df = dd.read_parquet(dt.file_uris(), **kwargs)

It looks like dd.read_parquet will have to go through the parquet files to read the metadata, but the DeltaTable should have all that info already.

`read_deltalake` vs `read_parquet` performance

I did a quick test reading DeltaLake data in a notebook on a Coiled cluster from s3, with dd.read_parquet vs ddt.read_deltalake.

Cluster: https://cloud.coiled.io/clusters/245026/information?account=dask-engineering.

Data is located in s3://coiled-datasets/delta/.

Results:

dataset computation timing (read_parquet) timing (read_deltalake)
ds20f_100M ddf["int1"].sum().compute() CPU times: user 43.5 ms, sys: 10.7 ms, total: 54.2 ms, Wall time: 8.04 s CPU times: user 159 ms, sys: 38.4 ms, total: 198 ms, Wall time: 55.3 s
ds20f_100M ddf.describe().compute() CPU times: user 256 ms, sys: 28.7 ms, total: 284 ms, Wall time: 20.7 s CPU times: user 380 ms, sys: 60.7 ms, total: 441 ms, Wall time: 1min 10s
ds25f_250M ddf["int1"].sum().compute() CPU times: user 67.1 ms, sys: 15.6 ms, total: 82.7 ms, Wall time: 16.7 s CPU times: user 666 ms, sys: 176 ms, total: 842 ms, Wall time: 3min 59s
ds25f_250M ddf.describe().compute() CPU times: user 605 ms, sys: 70.3 ms, total: 675 ms, Wall time: 1min 10s CPU times: user 1.02 s, sys: 181 ms, total: 1.2 s, Wall time: 4min 2s
ds50f_500M ddf["int1"].sum().compute() CPU times: user 204 ms, sys: 49.2 ms, total: 253 ms, Wall time: 1min 2s CPU times: user 2.93 s, sys: 626 ms, total: 3.56 s, Wall time: 16min 46s
ds50f_500M ddf.describe().compute() CPU times: user 3.59 s, sys: 383 ms, total: 3.97 s, Wall time: 5min 53s killed before finished

This doesn't look good, and needs looking into.

Consider using dataset fragments instead of parquet uris

I saw this example and thought it might be a better way to load data.

Right now it looks like you rely on being able to just read from the Parquet files and load the partition values from HIVE-style directories. This isn't robust in two ways:

  1. HIVE-style directories aren't guaranteed in the Delta Lake format. The delta protocol states that "This directory format is only used to follow existing conventions and is not required by the protocol. Actual partition values for a file must be read from the transaction log." 1
  2. Deletion vectors and column mapping mean reading the parquet files as-is won't give you the correct data, once we start supporting reader protocols 2 and 3.

In the future, it would be best not to rely on reading from the file URIs and instead read from the dataset fragments, which will provide the correct data as the Delta Protocol continues to evolve.

Footnotes

  1. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#data-files

Failed import when running `deltalake==0.14.0`

To reproduce:

  • create new mamba/conda env
  • mamba install python==3.11 pip
  • pip install deltalake==0.14
  • pip install dask-deltatable
  • run import dask_deltatable
ImportError: cannot import name '_write_new_deltalake' from 'deltalake.writer' (/Users/rpelgrim/miniforge3/envs/deltalake-0130/lib/python3.11/site-packages/deltalake/writer.py)

To solve:

  • pip install deltalake==0.13

Order data by partitions if available

I've stored a bunch of data partitioned by date, and written it to delta using the deltalake package like so:

for df in dfs:
    write_deltalake("mytable", df, partition_by="date")

(although actually this was done in parallel, and so things maybe got written out of order

├── date=2024-01-01
│   ├── part-00001-5c3d1646-6a8b-4511-87f5-3cd0acf1c0e8-c000.zstd.parquet
│   ├── part-00001-869909e1-1079-49db-83d6-77a43d67370a-c000.zstd.parquet
│   └── part-00001-eeb823e8-b9ed-49bd-86d9-9f28f0f444b7-c000.zstd.parquet
└── date=2024-01-02
    ├── part-00001-32e6c973-6d2a-4132-82dd-e6b431cf5343-c000.zstd.parquet
    ├── part-00001-426c2af2-6e86-4cfe-86c9-853f243c35e6-c000.zstd.parquet
    ├── part-00001-6fa10ba1-6b14-4908-b328-f8a8fdaec258-c000.zstd.parquet
    └── part-00001-79cd49d4-dee1-4957-8cdf-86b6f86f95f6-c000.zstd.parquet

When I go to read it I find that the data isn't sorted by partition

df = ddt.read_deltalake("mytable")
df.date.head()
0    2024-01-02
1    2024-01-02
2    2024-01-02
3    2024-01-02
4    2024-01-02
Name: date, dtype: date32[day][pyarrow]
df.date.tail()
1392523    2024-01-01
1392524    2024-01-01
1392525    2024-01-01
1392526    2024-01-01
1392527    2024-01-01
Name: date, dtype: date32[day][pyarrow]

We should order things if we can I think. I propose the following:

  1. If partitions are available, order by partitions
  2. Maybe within that we can look at partition statistics? These are stored within the "stats" attribute of the deltalake metadata
  3. We could also think about setting an index with the partition value.

Probably both the effort and uncertainty increase as we go down that list. The first item seems pretty straightforward to me though.

`ImportError` with `deltalake=0.16.0`

When using the deltalake=0.16.0 release (released yesterday), I get the following error when attempting to import dask_deltatable

Traceback (most recent call last):
  File "/Users/james/projects/coiled/etl-tpch/workflow.py", line 8, in <module>
    from pipeline.reduce import query_reduce
  File "/Users/james/projects/coiled/etl-tpch/pipeline/reduce.py", line 6, in <module>
    import dask_deltatable as ddt
  File "/Users/james/mambaforge/envs/etl-tpch/lib/python3.11/site-packages/dask_deltatable/__init__.py", line 9, in <module>
    from .write import to_deltalake as to_deltalake
  File "/Users/james/mambaforge/envs/etl-tpch/lib/python3.11/site-packages/dask_deltatable/write.py", line 18, in <module>
    from deltalake.writer import (
ImportError: cannot import name 'MAX_SUPPORTED_WRITER_VERSION' from 'deltalake.writer' (/Users/james/mambaforge/envs/etl-tpch/lib/python3.11/site-packages/deltalake/writer.py)

`read_deltalake` breaks with dask>=2024.3.1

Hey folks! Amazing work on the query planning functionality with dask-expr :)
I was just running some deltalake queries and it seems like this upgrade is interfering with read_deltalake

I'm getting a NotImplementedError: dask_expr does not support a token argument. when running the code snippet below:

To reproduce:

from deltalake import write_deltalake
import dask_deltatable as ddt
import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame({
    "group": [1, 1, 2, 2, 3, 3, 4, 4],
    "num": list(range(8)),
    "letter": ["a", "b", "c", "d", "e", "f", "g", "h"],
})

write_deltalake("some-table", df, partition_by="group")

delta_path = "some-table"
ddf = ddt.read_deltalake(delta_path)

To fix:

  • downgrade Dask==2024.2.1
  • OR run
import dask
dask.config.set({'dataframe.query-planning': False})

EDIT
I've tried upgrading dask-expr as suggested here. This upgrades Dask to 2024.4.1 but does not fix the issue for me.

Support writing and reading back index

Implement writing and reading back Dask dataframe index.

Right now, after reading back, index will become just another column:

from dask.datasets import timeseries
from dask_deltatable import read_delta_table
from dask_deltatable.write import to_deltalake


if __name__ == "__main__":
    ddf = timeseries("2023-07-10", "2023-07-12", freq="1H")
    print(f"\nOriginal:\n{ddf.head()}")
    ddf.index = ddf.index.astype("datetime64[us]")
    to_deltalake("t2_data", ddf).compute()
    ddf2 = read_delta_table("t2_data")
    print(f"\nRead back:\n{ddf2.head()}")

Output:

Original:
                         name    id         x         y
timestamp
2023-07-10 00:00:00  Patricia   989  0.748295  0.513837
2023-07-10 01:00:00     Jerry   993  0.487847 -0.493534
2023-07-10 02:00:00    Victor  1035  0.851038 -0.941464
2023-07-10 03:00:00     Sarah   966  0.878928 -0.087768
2023-07-10 04:00:00     Quinn   983  0.712967 -0.977796

Read back:
       name    id         x         y           timestamp
0  Patricia   989  0.748295  0.513837 2023-07-10 00:00:00
1     Jerry   993  0.487847 -0.493534 2023-07-10 01:00:00
2    Victor  1035  0.851038 -0.941464 2023-07-10 02:00:00
3     Sarah   966  0.878928 -0.087768 2023-07-10 03:00:00
4     Quinn   983  0.712967 -0.977796 2023-07-10 04:00:00

Original PR: #29.

Implement `mode == "overwrite"` in `to_deltalake`

The first version of writing into deltalake did not implement overwriting an existing table. Currently, this is raising a ValueError (should actually raise a NotImplementedError):

import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask_deltatable.write import to_deltalake


if __name__ == "__main__":
    df = pd.DataFrame({
        "i1": np.random.randint(1, 10000, size=100),
        "f1": np.random.random(100),
        "s1": np.random.choice(["Apple", "Banana", "Watermelon", "Mango"], size=100),
    })
    ddf = dd.from_pandas(df, npartitions=10)
    to_deltalake("t1_data", ddf, mode="overwrite").compute()

Raises:

Traceback (most recent call last):
  File "/Users/jbennet/src/dask-deltatable/t1.py", line 14, in <module>
    to_deltalake("t1_data", ddf, mode="overwrite").compute()
  File "/Users/jbennet/src/dask-deltatable/dask_deltatable/write.py", line 82, in to_deltalake
    raise ValueError(
ValueError: Schema of data does not match table schema
Table schema:
None
Data Schema:
i1: int64
f1: double
s1: string

Original PR: #29.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.