Giter Site home page Giter Site logo

dask-expr's Introduction

Dask

Build Status Coverage status Documentation Status Discuss Dask-related things and ask for help Version Status NumFOCUS

Dask is a flexible parallel computing library for analytics. See documentation for more information.

LICENSE

New BSD. See License File.

dask-expr's People

Contributors

charlesbluca avatar crusaderky avatar fjetter avatar graingert avatar hendrikmakait avatar j-bennet avatar jorisvandenbossche avatar jrbourbeau avatar milesgranger avatar mrocklin avatar phofl avatar rjzamora avatar scharlottej13 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-expr's Issues

Add another optimization step to accommodate ops that require expensive meta information (e.g. ``divisions``)

Matt and I chatted offline about #166 and potential traps when implementing set_index.

Generally, we shouldn't use divisions while simplifying the Expression tree and pushing stuff up and down (I'll open another issue about how we can avoid this partially with align-like stuff). Computing divisions might be expensive, accessing them multiple times for a set_index/sort_values/... call will trigger multiple computations. So it's better to avoid them whenever possible.

Some methods will require information about the divisions to select the best algorithm (merge as an example). We might pass a _simplify_down step multiple times depending on the structure of our Expression tree, which would potentially require re-computing the divisions each time that merge is hit by this.

We can avoid this conflict through adding another optimization step that comes after simplify, lower for example. This step can accommodate optimizations that require information that are relatively expensive to obtain (like divisions). Ideally we avoid multiple passes in this case.

We should keep in mind that we need a more complex structure in the future, which would require us to generalize the different optimization steps.

cc @rjzamora

Selecting from Index with duplicates raises or returns incorrect results

from dask_expr import from_pandas

df = pd.DataFrame({"a": [1, 2, 3], "bb": 1}, index=["a", "a", "b"])
ddf = from_pandas(df)

ddf.a["b"].compute()

This raises

Traceback (most recent call last):
  File "/Users/patrick/Library/Application Support/JetBrains/PyCharm2023.1/scratches/dask_epr.py", line 11, in <module>
    print(ddf.a["b"].compute())
          ^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 314, in compute
    (result,) = compute(self, traverse=False, **kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 583, in compute
    collections, repack = unpack_collections(*args, traverse=traverse)
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 474, in unpack_collections
    repack_dsk[out] = (tuple, [_unpack(i) for i in args])
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 474, in <listcomp>
    repack_dsk[out] = (tuple, [_unpack(i) for i in args])
                               ^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 435, in _unpack
    if is_dask_collection(expr):
       ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 186, in is_dask_collection
    return x.__dask_graph__() is not None
           ^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/collection.py", line 86, in __dask_graph__
    out = out.simplify()
          ^^^^^^^^^^^^^^
  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 227, in simplify
    out = expr._simplify_down()
          ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 836, in _simplify_down
    assert a == b
           ^^^^^^
AssertionError

Process finished with exit code 1

while

ddf.a["a"].compute()

returns

a    1
a    2
b    3
Name: a, dtype: int64

Move over Shuffle

So far we've migrated two pieces of sizable functionality

  • Apply-concat-apply
  • Read_parquet

The next chunk I think it would be good to move over is shuffling.

It would also be nice if we could simplify things as we move them over. In at least the ACA case (the only one I have real knowledge of) the code got significantly simpler (my guess is 25% the original size). My hope is that we could do something similar with shuffling as well.

Handle API differences between DataFrame and Series

Right now we implement type-agnostic methods on Expr and then have DataFrame and Series mostly defer to Expr when they don't have something. This has been effective at getting us pretty far with low boiler-plate and also while being decently (but not entirely) simple.

However sometimes this doesn't work, as with operations like value_counts or unique or drop_duplicates where DataFrame and Series have different APIs or in operations like nlargest where keywords differ based on the DataFrame/Series difference.

What should we do?

Caching ``__dask_graph__``

Currently, we are building the graph multiple times before doing the actual computation. This wasn't a problem, since the optimizations were cheap. We should cache the graph to avoid multiple passthroughs as soon as we start adding methods that might trigger computations during the optimization step (see #181)

Keep ReadParquet Lengths computation lazy?

Currently we're doing computation in the optimize call. I wonder if instead we should have another Expression. This would replace the Literal value with something that would return a graph with a single task.

Just a thought, not a strong one.

Release on conda-forge

Now that we have a release out, it'd be nice to release on conda-forge too (@mmccarty mentioned this on the monthly community meeting)

Pushing a ``Projection`` through an operation that operates on a subset of columns

A relatively common operation is something like the following:

df.drop_duplicates(subset=["a"]).b

We can optimise this to something like

df[["a", "b"]].drop_duplicates(subset=["a"]).b

Doing this with our Projection class would cause a RecursionError though, since we would try to push b up over and over again. Thoughts on how we can solve this?

  • Implement another class that isn't used in the simplify_up steps
  • Add an attribute that controls whether or not we can simplify
  • ...

Add more Elemwise / Blockwise operations

We have many simple operations like add, mul, assign, astype, filter, etc.. that were simple and easy to add. This part of the codebase seems pretty stable right now.

There are probably many more operations like these we could bring over without much uncertainty. I propose that we do this, but by having someone else do it, anyone who isn't @rjzamora and @mrocklin . This would be a good first issue.

Some example operations:

  • round
  • more operators
  • to_timestamp
  • map
  • applymap
  • rename
  • fillna
  • dropna
  • sample
  • replace
  • abs
  • align
  • clip
  • between
  • combine_first
  • explode
  • to_frame
  • isin

Create separate Expr and API classes

There are a few issues that motivate the desire to have a separate encapulsating class

class DataFrame:
    expr: Expr

class Series:
    expr: Expr

...

Those issues are as follows:

Mutation

For cases like the following:

df["z"] = df.x + df.y

We need to be able to do something like the following:

def __setitem__(self, column, value):
    expr = self.expr.assign(column=value)
    self.expr = expr  # we need to be able to do this for mutation

This requires that the object the user has needs to be able to have all of its metadata rewritten, and the underlying operation needs to be able to be changed. Relying on types for the operation means that this is a no-go with the current single class architecture.

Class Explosion

DataFrame, Series, and Index have different APIs that we'll need to present to the user. The easiest way to do this is with new classes. However if we're also using classes for the operation (Sum, Add, Join) then we enter n by k subclass hell. Composition handles this fine.

Poluting the API

Expressions have various attributes like .frame, .left, .right, which we probably don't want to present to the user because it'll end up confusing things with column attributes and so on. We probably want to hide these a bit. With the proposed architecture these become df.expr.left which is appropriately hidden. We'll always have the parameters take precedence over anything else, and this will probably be fine because users won't touch this layer by default.

Concerns

I haven't done this yet just because it adds a bit of development inertia (have to add two methods for everything, or else drop syntactic sugar for Exprs).

Optimizing Filters and Merges

I was writing down some tests to help me think through swapping filters and merges

def test_filter_merge():
    a = pd.DataFrame({"a": range(5), "b": range(5), "c": range(5)})
    b = pd.DataFrame({"c": [0, 2, 4, 6, 8], "x": range(5), "y": range(5)})

    a = dx.from_pandas(a)
    b = dx.from_pandas(b)

    # Some simple cases
    df = a.merge(b)
    df = df[df.x > 3]
    bb = b[b.x > 3]
    expected = a.merge(bb)
    assert df.optimize()._name == expected.optimize()._name

    df = a.merge(b)
    df = df[df.b > 3]
    aa = a[a.b > 3]
    expected = aa.merge(b)
    assert df.optimize()._name == expected.optimize()._name

    # Apply to both!
    df = a.merge(b)
    df = df[df.c > 3]
    aa = a[a.c > 3]
    bb = b[b.c > 3]
    expected = aa.merge(bb)
    assert df.optimize()._name == expected.optimize()._name

    # Works with more complex expressions, and multiple columns
    df = a.merge(b)
    df = df[df.a > df.b + 1]
    aa = a[a.a > a.b + 1]
    expected = aa.merge(b)
    assert df.optimize()._name == expected.optimize()._name

    # Only apply if all columns are in the table, not if only some are
    df = a.merge(b)
    df = df[df.c > df.x + 1]
    bb = b[b.c > b.x + 1]
    expected = a.merge(bb)
    assert df.optimize()._name == expected.optimize()._name

    # Bail if we engage non-elemwise expressions in the predicates
    df = a.merge(b)
    df = df[df.x > df.y.sum()]
    bb = b[b.x > b.y.sum()]
    not_expected = a.merge(bb)
    assert df.optimize()._name != not_expected.optimize()._name

I think that what I came to is that we want to push a filter into one side or the other if

  1. the predicate is valid when we replace the merged table with the left or right side. Something like if predicate.subs({merged: left}) doesn't raise an error (after we figure out renaming of columns).
  2. The predicate has only elemwise expressions (nothing like df.x < df.y.sum() for example)

And so maybe an implementation here relies on ...

  1. Making a proxy object for left that has no expression history, but does have the same ._meta
  2. Calling substitute and seeing if it errs
  3. Scanning through the substituted predicate expression and seeing if everything in it is Elemwise

I think that that should cover most cases and be various robust.

Things get a bit more complex if we also want to support And clauses (there are cases where we could split a predicate into two parts and each part applies to one side) but I think that this is probably pretty rare and not important.

I don't plan to do this work, but it was interesting to think through. I suspect that at some point we'll run into this problem, either in a benchmark or in practice and in that case I hope that the tests and discussion above help a little.

We tend to read data multiple times from storage

I exchange a couple of messages with Rick on this topic yesterday.

Our optimization logic has a weakness when it comes to reading data (the issue extends to other methods as well, but let's focus on I/O to keep it a bit simpler). We tend to read data multiple times when we have different branches in our tree from the same root. One example:

df = read_parquet("taxi.parquet")
df["tipped"] = df.tips != 0
df["bpf_valid"] = df.base_passenger_fare != 0

q = df[["tipped", "bpf_valid", "tips"]].sum()

tips is read twice in this case. Once for the NE expression that is used for assignment and once more for the sum aggregation at the end.

This is a problem that get's worse when the query gets more complicated. It's triggered if one operation operates on 2 DataFrames from the same root.

Ideally, we would only read the data once.

read_parquet(path, columns=["base_passenger_fare", "tips"], ...)["tips"]

There might be reasons not to do this, e.g. memory pressure, but this is probably only an option when we are not reading from remote storage. Our current optimization logic does not provide a mechanism to address this.

This isn't first-order priority but we should address this soonish.

This extends to multiple operations as well:

df = read_parquet("taxi.parquet")
df = df.replace(1, 100).fillna(100)
df["tipped"] = df.tips != 0
df["bpf_valid"] = df.base_passenger_fare != 0

q = df[["tipped", "bpf_valid", "tips"]].sum()

We will apply replace and fillna to tips twice as well.

cc @rjzamora

[Proposal] Add "phases" argument to `Expr.simplify`

I spent some time thinking about #166, #181 and #186 yesterday, and experimented a bit with optimization ordering. I played with some relatively complex approaches, but ultimately decided to propose something pretty simple.

General Idea: We add an optional phases: tuple argument to simplify, move the graph-traversal logic into a _simplify method, and add an allow_group argument to _simplify, _simplify_up, and simplify_down.

Rough Code Outline:

class AbstractExpr(Expr):
    # "Example" Expr class
    
    _simplified = None  # Use attribute for hashing (can be done in separate work)

    def _simpify_down(self, allow_group: tuple):
        # Hypothetical example
        if "repartition" in allow_group:
            # Hold off on "lowering" until "repartition" phase
            return RepartitionDivisions(...)
    
    def _simpify_up(self, parent, allow_group: tuple):
        # Probably don't need to check allow_group for projection
        if isinstance(parent, Projection):
            ...
                
def simplify(self, phases=("abstract", "lower", "repartition")):
    # NOTE: Could always add/change phases later on
    
    # Loop over all "phases"
    expr = self
    for p in range(len(phases)):
        allow_group = phases[:p+1]
        
        # For each phase, we do the usual "simplify" loop
        # calling _simplify_down, _simplify_up and _simplify
        # until nothing changes. We pass in `allow_group`
        # to enforce rough ordering of optimizations.
        ...
        
    return expr

def _simplify(self, allow_group: tuple):
    
    # Use chached result if available
    if (self._simplified or {}).get(allow_group) is not None:
        return self._simplified
    
    # Usual "simplify" loop here, but `allow_group`
    # is passed to `_simplify_up` and `_simplify_down`
    expr = self
    ...
    
    # Cache result
    self._simplified = {allow_group: expr}
    return expr

Annotations

We currently store annotations on high level layers. We'll probably want to do the same thing with Exprs now. Some complications:

  1. We often recreate Exprs and will need to make sure that we propagate these annotations
  2. Things like rewrite rules will need to be sensitive to annotations (probably not optimize by default)

I don't plan to think about this too much until other questions are answered. This doesn't seem as critical.

Need CI

It's probably time to implement CI. We can probably start lightweight, maybe just one environment (linux, Python 3.10 or something) running on GitHub Actions or whatever the normal thing is these days.

Expr in columns is True

Not sure what's going on here. Not a huge deal though:

In [1]: from dask_expr.datasets import timeseries

In [2]: df = timeseries()

In [3]: df.head()
Out[3]:
                        name    id         x         y
timestamp
2000-01-01 00:00:00   Ingrid  1023 -0.745347 -0.032617
2000-01-01 00:00:01   Oliver   973 -0.280432  0.715767
2000-01-01 00:00:02    Sarah  1008  0.629102  0.581916
2000-01-01 00:00:03  Michael   996 -0.112556  0.247828
2000-01-01 00:00:04   Ingrid  1058 -0.439967  0.958142

In [4]: df.x in df.columns
Out[4]: False

In [5]: df.expr.x in df.expr.columns
Out[5]: True

Add Tail

This would be a good first issue. Probably folks would want to look at Head

Expression rewriting options

@rjzamora pointed me at this, and I had some thoughts around the term rewriting aspects.

peephole-based rewrite systems like matchpy are quite powerful, but do suffer from some flaws if local rewrites aren't always sending you downhill in the cost function landscape. To enumerate all possible expressions and pick the best one with backtracking is exponential in the number of rewrites and size of the expression.
One also needs to be a little bit careful applying all rules that one doesn't have pairs of rules that undo each other's work.

An alternate approach, that may be too heavy-weight in this instance, is equality saturation using e-graphs. These are a compact datastructure for representing all combinations of rewritten terms at once (polynomial rather than exponential). I like the egg library which comes with a (rather underdocumented) python interface via snake-egg. Probably there is no need to consider it at the moment (since exploring the landscape of what the expression system looks like is more difficult than mechanically translating from one rewriting system to another).

[Bug] Non-deterministic `tokenize` for `cudf` and `pyarrow` objects

As discussed in an older Dask issue (dask/dask#6718), GPU objects do not produce a deterministic hash within tokenize. This is a problem for dask-expr, because we assume that an expression can be uniquely identified by the result of tokenize(*operands) (where operands may include GPU objects). While experimenting with this, I also discovered that some pyarrow objects (e.g. pa.Table) also produce a non-deterministic token.

In the long term, this can be resolved upstream in cudf by implementing deterministic object hashing. We can also leverage the normalize_token dispatch to convert cudf objects into a smaller (but representative) tuple of pandas/numpy objects:

from dask.base import normalize_token
import cudf

@normalize_token.register((cudf.DataFrame, cudf.Series, cudf.Index))
def _tokenize_cudf_object(obj):
    # Convert cudf objects to representative pd/np data
    return (
        type(obj),  # Type info
        (
            obj.to_frame()
            if not isinstance(obj, cudf.DataFrame)
            else obj
        ).dtypes,  # Schema info
        (
            obj.to_frame()
            if isinstance(obj, cudf.Index)
            else obj.reset_index()
        ).hash_values().to_numpy(),  # Hashed rows
    )

Although this kind of solution requires device -> host data movement to convert the hashed-row series into numpy, the amount of data that needs to move will be minimal in performance-sensitive cases, because users should not be using an API like from_pandas.

Overall Question: "How do we want to deal with non-deterministic tokenization in general?" That is, will we rely on normalize_token dispatching? If so, should we add the dispatch implementations in dask/dask, or should we provide a space in dask-expr?

Release

I think that @phofl plans to do an initial release today. Mostly raising this issue so that @rjzamora is aware (going ahead without waiting for approval though).

Proposal: Decouple `_parameters` and `__getattr__`/`__setattr__`

While working through #1 and #2, I ran into some problems with the behavior of __getattr__, which currently looks like this:

    def __getattr__(self, key):
        if key == "__name__":
            return object.__getattribute__(self, key)
        elif key in type(self)._parameters:
            idx = type(self)._parameters.index(key)
            return self.operands[idx]
        elif key in dir(type(self)):
            return object.__getattribute__(self, key)
        elif is_dataframe_like(self._meta) and key in self._meta.columns:
            return self[key]
        else:
            return object.__getattribute__(self, key)

The problem is that we currently treat values in _parameters as high-priority attributes of the object, and have no mechanism to deal with the fact that there may be overlap between those parameter names and other attribute keys or column names. I don't think this behavior is maintainable.

I propose that we require a distinct syntax to access parameter values (e.g. self.parameter() or self.operand()). I also don't think it is necessary to provide a public API for setting a parameter value (currently provided by __setattr__).

Make uber-lyft example work

I (and others) often use this example when giving Dask dataframe examples: https://github.com/coiled/examples/blob/main/uber-lyft.ipynb

I tried running it and, to my surprise, some things actually worked! Not everything though, in particular:

  • value_counts doesn't exist
  • Groupby aggregations like df.groupby("hvfhs_license_num").tipped.mean().compute() didn't seem to work
AttributeError                            Traceback (most recent call last)
Cell In[11], line 3
      1 df["tipped"] = df.tips != 0
----> 3 df.groupby("hvfhs_license_num").tipped.mean().compute()

AttributeError: 'GroupBy' object has no attribute 'tipped'

My thinking is that by going after common workflows like this we'll find ourselves with a good objective function to help us prioritize.

Add more reductions

This should be a good first issue for someone who isn't @rjzamora or @mrocklin . Some not-yet-existing-but-easy-reductions

  • any
  • all
  • prod
  • idxmin/idxmax
  • mode
  • var/std (this one might get strange)
  • describe_1d (this one might get strange)
  • nbytes
  • drop_duplicates / unique (this one might get strange)
  • value_counts
  • nlargest / nsmallest
  • memory_usage

AttributeError: 'SortValues' object has no attribute 'user_divisions'

from dask_expr.datasets import timeseries
df = timeseries()
df = df.sort_values("x")
df["z"] = df.x + df.y

The output is a little strange here. I'm not yet sure why.

AttributeError                            Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
    151 try:
--> 152     return object.__getattribute__(self, key)
    153 except AttributeError as err:

AttributeError: 'SortValues' object has no attribute 'user_divisions'

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
    151 try:
--> 152     return object.__getattribute__(self, key)
    153 except AttributeError as err:

File ~/mambaforge/envs/dask-expr/lib/python3.11/functools.py:1001, in cached_property.__get__(self, instance, owner)
   1000 if val is _NOT_FOUND:
-> 1001     val = self.func(instance)
   1002     try:

File ~/workspace/dask-expr/dask_expr/_expr.py:696, in Expr.divisions(self)
    694 @functools.cached_property
    695 def divisions(self):
--> 696     return tuple(self._divisions())

File ~/workspace/dask-expr/dask_expr/_shuffle.py:627, in BaseSetIndexSortValues._divisions(self)
    626 def _divisions(self):
--> 627     if self.user_divisions is not None:
    628         return self.user_divisions

File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
    172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
    174     f"{err}\n\n"
    175     "This often means that you are attempting to use an unsupported "
    176     f"API function. Current API coverage is documented here: {link}."
    177 )

AttributeError: 'SortValues' object has no attribute 'user_divisions'

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
    151 try:
--> 152     return object.__getattribute__(self, key)
    153 except AttributeError as err:

File ~/mambaforge/envs/dask-expr/lib/python3.11/functools.py:1001, in cached_property.__get__(self, instance, owner)
   1000 if val is _NOT_FOUND:
-> 1001     val = self.func(instance)
   1002     try:

File ~/workspace/dask-expr/dask_expr/_expr.py:696, in Expr.divisions(self)
    694 @functools.cached_property
    695 def divisions(self):
--> 696     return tuple(self._divisions())

File ~/workspace/dask-expr/dask_expr/_expr.py:1668, in Projection._divisions(self)
   1667     return (None, None)
-> 1668 return super()._divisions()

File ~/workspace/dask-expr/dask_expr/_expr.py:1061, in Blockwise._divisions(self)
   1060     if not self._broadcast_dep(arg):
-> 1061         assert arg.divisions == dependencies[0].divisions
   1062 return dependencies[0].divisions

File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
    172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
    174     f"{err}\n\n"
    175     "This often means that you are attempting to use an unsupported "
    176     f"API function. Current API coverage is documented here: {link}."
    177 )

AttributeError: 'SortValues' object has no attribute 'user_divisions'

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
    151 try:
--> 152     return object.__getattribute__(self, key)
    153 except AttributeError as err:

File ~/workspace/dask-expr/dask_expr/_expr.py:712, in Expr.npartitions(self)
    711 else:
--> 712     return len(self.divisions) - 1

File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
    172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
    174     f"{err}\n\n"
    175     "This often means that you are attempting to use an unsupported "
    176     f"API function. Current API coverage is documented here: {link}."
    177 )

AttributeError: 'SortValues' object has no attribute 'user_divisions'

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

During handling of the above exception, another exception occurred:

AttributeError                            Traceback (most recent call last)
Cell In[1], line 4
      2 df = timeseries()
      3 df = df.sort_values("x")
----> 4 df["z"] = df.x + df.y

File ~/workspace/dask-expr/dask_expr/_collection.py:79, in _wrap_expr_op(self, other, op)
     77 if not isinstance(other, expr.Expr):
     78     return new_collection(getattr(self.expr, op)(other))
---> 79 elif expr.are_co_aligned(self.expr, other):
     80     return new_collection(getattr(self.expr, op)(other))
     81 else:

File ~/workspace/dask-expr/dask_expr/_expr.py:2187, in are_co_aligned(*exprs)
   2185 def are_co_aligned(*exprs):
   2186     """Do inputs come from different parents, modulo blockwise?"""
-> 2187     exprs = [expr for expr in exprs if not is_broadcastable(expr)]
   2188     ancestors = [set(non_blockwise_ancestors(e)) for e in exprs]
   2189     unique_ancestors = {
   2190         # Account for column projection within IO expressions
   2191         _tokenize_partial(item, ["columns", "_series"])
   2192         for item in flatten(ancestors, container=set)
   2193     }

File ~/workspace/dask-expr/dask_expr/_expr.py:2187, in <listcomp>(.0)
   2185 def are_co_aligned(*exprs):
   2186     """Do inputs come from different parents, modulo blockwise?"""
-> 2187     exprs = [expr for expr in exprs if not is_broadcastable(expr)]
   2188     ancestors = [set(non_blockwise_ancestors(e)) for e in exprs]
   2189     unique_ancestors = {
   2190         # Account for column projection within IO expressions
   2191         _tokenize_partial(item, ["columns", "_series"])
   2192         for item in flatten(ancestors, container=set)
   2193     }

File ~/workspace/dask-expr/dask_expr/_expr.py:2168, in is_broadcastable(s)
   2163 def is_broadcastable(s):
   2164     """
   2165     This Series is broadcastable against another dataframe in the sequence
   2166     """
-> 2168     return s.ndim == 1 and s.npartitions == 1 and s.known_divisions or s.ndim == 0

File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
    170     return self[key]
    172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
    174     f"{err}\n\n"
    175     "This often means that you are attempting to use an unsupported "
    176     f"API function. Current API coverage is documented here: {link}."
    177 )

AttributeError: 'SortValues' object has no attribute 'user_divisions'

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.

Benchmark failures - bugs/missing operations

  • GroupBy.value_counts
  • iloc
  • select_dtypes
  • Repartition misses keyword-argument partition_size
  • GroupBy.apply
  • set_index
  • grouping by a Series object
  • grouping by an Index object
  • assign with callable
  • GroupBy.agg shuffle keyword
  • missing align step in add/sub, ...
  • missing keywords in read_csv
  • "p2p" shuffle

Not planning on fixing all of them immediately, just collecting failures and fixing what was on my agenda anyway or is a very small fix.

Reference build: https://github.com/coiled/benchmarks/actions/runs/5310108176/jobs/9611644148?pr=837

Culling

There has been pain around graph culling in the past. We want to be efficient in cases where people ask for only a small subset of the output result, and that requires only a small subset of the graph to be generated. This occurs in situations like .head and array slicing.

My hope is that a lot of this pain goes away with high level optimization. For example we should have optimizations that push .head through many operations (everything except IO, shuffling, ACA, etc..). Even then there are a couple of options:

  1. Have operations like read_parquet and shuffle know the output partitions that are requested as part of their own metadata (so the head operation disappears entirely once optimized)
  2. Keep the head optimization around, and accept some wasted generation, but feel glad because it's only happening on one layer, rather than on many layers.

Regardless, a good first step here, I think, is to build some optimizations around Head

Something's off with Parquet optimization and computing

This is one of my standard examples, and it's quite sad today

import dask.dataframe as dd
import dask_expr as dx

df = dx.read_parquet(
    "s3://coiled-datasets/uber-lyft-tlc/",
)
df["tipped"] = df.tips != 0
df.groupby(df.hvfhs_license_num).tipped.mean()

It looks like this second cell triggers computation when it should be instantaneous. I suspect the parquet lengths stuff. cc @rjzamora

Meta can not reflect dtype when restricting columns later on

Meta has dtype float64 in the following example since it takes the float column into account

df = pd.DataFrame({"a": [11, 12, 31], "b": 1.5})
ddf = from_pandas(df, npartitions=1)
result = ddf.prod()["a"]
result._meta
result.compute()

The result has dtype int64 since we dropped the float column before computing the mean. Not sure if/when we can address this, but wanted to keep track, so opening this issue

Add to_parquet

If we're looking to have a realistic workload then my guess is that the read_parquet -> shuffle -> to_parquet pipeline seems like a good first one. @rjzamora maybe something to think about?

DataFrame merge should use HashJoinLayer instead of two separate P2Ps

import dask_expr
from distributed import Client
c = Client()
df = dask_expr.datasets.timeseries()
df2 = dask_expr.datasets.timeseries()
sum_x = (
    df
    .merge(df2, on=["name", "id"])
    .x_x.sum()
)
print(sum_x.optimize().tree_repr())
Sum(TreeReduce): split_every=0
  Fused(8cf79):
  | Sum(Chunk):
  |   Projection: columns='x_x'
  |     BlockwiseMerge: left_on=['name', 'id'] right_on=['name', 'id']
  |       Projection: columns=['x', 'name', 'id']
            P2PShuffle: partitioning_index='_partitions' npartitions_out=30 ignore_index=False options=None
              Fused(d0cfd):
              | AssignPartitioningIndex: partitioning_index=['name', 'id'] index_name='_partitions' npartitions_out=30
              |   Timeseries: dtypes={'x': <class 'float'>, 'name': 'string', 'id': <class 'int'>} seed=1132422555
  |       Projection: columns=['name', 'id', 'x']
            P2PShuffle: partitioning_index='_partitions' npartitions_out=30 ignore_index=False options=None
              Fused(f9ad4):
              | AssignPartitioningIndex: partitioning_index=['name', 'id'] index_name='_partitions' npartitions_out=30
              |   Timeseries: dtypes={'name': 'string', 'id': <class 'int'>, 'x': <class 'float'>} seed=946678847

is using a BlockwiseMerge that acts on two P2PShuffle but instead it should be using a HashJoin, see dask/distributed#7514 dask/distributed#7496
(shortcoming of scheduler heuristics will not schedule two p2ps efficiently; other binary ops can be affected similarly)

Groupby API

We have the ACA structure for this. There's now a lot of grunt work to bring over the algorithms.

Also! We need to figure out how we want to represent the intermediate Groupby object. That should be interesting.

Remove DataFrame assumptions from Expr

Eventually we'll have all sorts of Exprs like arrays, bags, etc.. Today we're mostly focused on dataframe and that's good I think.

However, even today we have non-DataFrame exprs like scalars, and the results of to_parquet calls. Maybe it's time to pull out some of the dataframe metadata assumptions like divisions and maybe even meta from the Expr class and make some new Frame class (or some better name) from which dataframe-like-things (dataframe, series, index) inherit.

Selecting single row after reduction raises RecursionError when checking the index in ``assert_eq``

import pandas as pd
from dask.dataframe import assert_eq

from dask_expr import from_pandas

df = pd.DataFrame({"a": [1, 2, 3], "bb": 1})
ddf = from_pandas(df)

assert_eq(ddf.max()["a"], df.max()["a"])

raises

  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 134, in __getattr__
    return object.__getattribute__(self, key)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 841, in _meta
    return self.frame._meta.index
           ^^^^^^^^^^^^^^^^
  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/reductions.py", line 100, in _meta
    meta = self.chunk(meta, **self.chunk_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/reductions.py", line 144, in chunk
    out = cls.reduction_chunk(df, **kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/utils.py", line 1105, in __call__
    return getattr(__obj, self.method)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/generic.py", line 11646, in max
    return NDFrame.max(self, axis, skipna, numeric_only, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/generic.py", line 11185, in max
    return self._stat_function(
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/generic.py", line 11158, in _stat_function
    return self._reduce(
           ^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/series.py", line 4666, in _reduce
    return op(delegate, skipna=skipna, **kwds)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 158, in f
    result = alt(values, axis=axis, skipna=skipna, **kwds)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 421, in new_func
    result = func(values, axis=axis, skipna=skipna, mask=mask, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 1083, in reduction
    values, mask, dtype, dtype_max, fill_value = _get_values(
                                                 ^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 313, in _get_values
    values = extract_array(values, extract_numpy=True)  # type: ignore[assignment]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/construction.py", line 441, in extract_array
    if isinstance(obj, (ABCIndex, ABCSeries)):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/dtypes/generic.py", line 44, in _instancecheck
    return _check(inst) and not isinstance(inst, type)
           ^^^^^^^^^^^^
  File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/dtypes/generic.py", line 38, in _check
    return getattr(inst, attr, "_typ") in comp
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded in comparison

Distributed protocol

We would like to send expressions up to the Dask scheduler have them optimized and lowered there and then enter the normal scheduler state. To do this I think that we need a new route on client/scheduler communication. Maybe that can be made more general and accept "something picklable that satisfies the .__dask_foo__ protocols" or something similar.

Eventually I'd love it if we held on to the Expr structure in the scheduler to inform future optimizations, but as a first pass I think that we probably just try to end up with the same state that the current update_graph route achieves.

@fjetter I think that this would benefit from someone on your team

Make ``Projection`` positional instead of label based

I looked a bit into how an iloc implementation could look like (we need this for some of the more complicated reductions to adjust for potentially duplicated columns). I haven't gone done this rabbit hole terribly far, because I wanted to avoid doing something that you might have considered before.

Right now, Projection is label based, which works nicely but doesn't mesh very well with positional indexes. We could implement another class that does the same for positional column indexers, but this would require special casing every simplify_up call for both of them (simply because df[positional_indexer] does not work). If we'd convert the columns given to Projection into positional indexes, we could mesh both of them together and generalise the _simplify_up steps.

Also it makes estimating stuff easier, since label based won't tell you the dimensions of your Frame (duplicated columns), but this is a side-note.

cc @mrocklin @rjzamora wdyt?

Some thoughts on high-level reasoning

I had two conversations recently with @fjetter and @phofl about reasoning about dataframes. Often during different stages (expression building, optimization, task graph building) we want to know various attributes of our dataframe. Here are a few things people have asked for in the past:

  1. Is it sorted along this column?
  2. How many partitions do we have?
  3. How many rows are in each partition?
  4. What are the min/max values of each partition in a sorted dataframe so that we can do joins better?

Historically we relied on divisions to answer many of these questions, and we're still used to doing that. However, this isn't great, because divisions can change as we optimize (maybe we decide to repartition based on worker size, for example) and because divisions doesn't hold all of the infomation we want (like lengths of partitions).

Instead, I think that we need to get used to asking questions of expressions by either inspecting the current tree, or by using other expressions. Two examples:

  1. @rjzamora added a Lengths expression, which computes the length of each partition, and knows how to optimize itself through Elemwise and ReadParquet calls so that, oftentimes, we can convert a Lengths(df) expression into a Literal.

    This is an example of using an expression to answer a question about an existing expression

  2. We could ask if two dataframes share the same partition alignment by asking if they have the same ancestors modulo Blockwise. For example the following are co-aligned:

    df = read_parquet(...)
    a = df.x[df.z > 1] + 10
    b = df.y - df.z.sum()
    assert are_co_aligned(a, b)

    This is an example of looking at an expression tree to answer a question.

  3. Similar to above we might infer that a column is sorted if it had previously been the target of a set_index or sort_values call, and was only operated on by monotonic expressions (we'd have to add this to known monotonic expressions)

General principles:

  1. rely on divsiions less
  2. try to only capture user inputs in Expression operands
  3. think about asking questions using expression structure, or with new expressions with fun optimizations

[Question] What is the best plan for task fusion

Dask-match certainly makes most high-level optimizations simpler than they would be with Dask's current HLG design. However, it is not yet clear to me how task fusion should come into play. There seem to be three general paths forward:

  1. We do nothing about task fusion on the expression side. Instead, we rely on the scheduler to either perform low-level fusion, or to use something like speculative task assignment.
  2. We implement a new mechanism within dask-match to enable the fusion of "blockwise" parent and child expressions.
  3. We modify the "blockwise" _layer methods to output Blockwise HLG layers instead of low-level dictionaries.

I like that (1) allows use to keep the logic in dask-match a bit simpler, but I'm not very confident that this approach will deliver the same benefits as (2) or (3). (3) is clearly the "fastest" way to support task fusion, but we will eventually need to migrate the necessary subgraph-callable logic into the Expr version of Blockwise if the long-term plan is to scrap HLGs altogether.

Fallback to `dask.dataframe` methods

We're working to fill out the DataFrame API here. Several important operations work, but there's still a lot of API surface not covered. One option would be to fall back to the existing dask.dataframe implementation when a dask-expr method doesn't exist.

This is just one possible option. I don't think we should focus on this at this stage. Just logging the idea.

AttributeError: 'Series' object has no attribute 'std'

Is this expected behavior?

Repro:

import dask_expr as dx
df = dx.datasets.timeseries(dtypes={"name": "string", "id": int, "quantity": int, "spend": float})
df["quantity"].std()

Stacktrace:

Traceback (most recent call last):
  File "lib/python3.9/site-packages/dask_expr/_collection.py", line 175, in __getattr__
    val = getattr(self.expr, key)
  File "lib/python3.9/site-packages/dask_expr/_expr.py", line 148, in __getattr__
    raise err
  File "lib/python3.9/site-packages/dask_expr/_expr.py", line 137, in __getattr__
    return object.__getattribute__(self, key)
AttributeError: 'Projection' object has no attribute 'std'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "lib/python3.9/site-packages/dask_expr/_collection.py", line 181, in __getattr__
    raise err
  File "lib/python3.9/site-packages/dask_expr/_collection.py", line 170, in __getattr__
    return object.__getattribute__(self, key)
AttributeError: 'Series' object has no attribute 'std'

Aggregations in groupby are broken

They somehow pull a lot of data onto one worker.

test_q3 from coiled benchmarks is a reproducer, simpler:

uri = "s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2_parquet/*.parquet"
ddf = read_parquet(uri, engine="pyarrow", storage_options={"anon": True})
ddf = ddf[["id3", "v1", "v3"]]
(
    ddf.groupby("id3", dropna=False, observed=True)[["v1"]].sum()
    #.agg({"v1": "sum", "v3": "mean"})
    .compute()
)

column selection and aggregation works but takes forever, using agg kills the worker. 30 workers (which is way more than we'd need normally)

Figuring out whether 2 frames are aligned

I used the divisions to identify whether or not 2 frames are aligned if this was necessary for a given operation.

A different approach, that is potentially cheaper:

Both frames are aligned as long as we can find a common root for both of them and we only pass through Blockwise operations while going through the expression tree. This can be used as a short-cut to figure out whether or not 2 frames are aligned without accessing any metadata.

Other cases will still require a repartitioning call.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.