Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
License: BSD 3-Clause "New" or "Revised" License
Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
Matt and I chatted offline about #166 and potential traps when implementing set_index
.
Generally, we shouldn't use divisions
while simplifying the Expression tree and pushing stuff up and down (I'll open another issue about how we can avoid this partially with align-like stuff). Computing divisions might be expensive, accessing them multiple times for a set_index
/sort_values
/... call will trigger multiple computations. So it's better to avoid them whenever possible.
Some methods will require information about the divisions to select the best algorithm (merge
as an example). We might pass a _simplify_down
step multiple times depending on the structure of our Expression tree, which would potentially require re-computing the divisions
each time that merge
is hit by this.
We can avoid this conflict through adding another optimization step that comes after simplify
, lower
for example. This step can accommodate optimizations that require information that are relatively expensive to obtain (like divisions
). Ideally we avoid multiple passes in this case.
We should keep in mind that we need a more complex structure in the future, which would require us to generalize the different optimization steps.
cc @rjzamora
Not a big deal, but .print()
has one less character and (might?) feel more familiar
Motivated by #233
We need a space to document the available API coverage, and to leave notes about current limitations.
from dask_expr import from_pandas
df = pd.DataFrame({"a": [1, 2, 3], "bb": 1}, index=["a", "a", "b"])
ddf = from_pandas(df)
ddf.a["b"].compute()
This raises
Traceback (most recent call last):
File "/Users/patrick/Library/Application Support/JetBrains/PyCharm2023.1/scratches/dask_epr.py", line 11, in <module>
print(ddf.a["b"].compute())
^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 314, in compute
(result,) = compute(self, traverse=False, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 583, in compute
collections, repack = unpack_collections(*args, traverse=traverse)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 474, in unpack_collections
repack_dsk[out] = (tuple, [_unpack(i) for i in args])
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 474, in <listcomp>
repack_dsk[out] = (tuple, [_unpack(i) for i in args])
^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 435, in _unpack
if is_dask_collection(expr):
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/base.py", line 186, in is_dask_collection
return x.__dask_graph__() is not None
^^^^^^^^^^^^^^^^^^
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/collection.py", line 86, in __dask_graph__
out = out.simplify()
^^^^^^^^^^^^^^
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 227, in simplify
out = expr._simplify_down()
^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 836, in _simplify_down
assert a == b
^^^^^^
AssertionError
Process finished with exit code 1
while
ddf.a["a"].compute()
returns
a 1
a 2
b 3
Name: a, dtype: int64
So far we've migrated two pieces of sizable functionality
The next chunk I think it would be good to move over is shuffling.
It would also be nice if we could simplify things as we move them over. In at least the ACA case (the only one I have real knowledge of) the code got significantly simpler (my guess is 25% the original size). My hope is that we could do something similar with shuffling as well.
@jrbourbeau mentioned that it might be interesting to try dask-expr out on some of the benchmarks in https://github.com/coiled/benchmarks .
My guess is that nothing will work yet, but that it'll highlight some low-hanging fruit to pick.
Anyone could do this. I think that @jrbourbeau is somewhat interested, although he's also out for a bit so it might make sense for someone else to try this out.
Right now we implement type-agnostic methods on Expr
and then have DataFrame
and Series
mostly defer to Expr
when they don't have something. This has been effective at getting us pretty far with low boiler-plate and also while being decently (but not entirely) simple.
However sometimes this doesn't work, as with operations like value_counts
or unique
or drop_duplicates
where DataFrame and Series have different APIs or in operations like nlargest
where keywords differ based on the DataFrame/Series difference.
What should we do?
Currently, we are building the graph multiple times before doing the actual computation. This wasn't a problem, since the optimizations were cheap. We should cache the graph to avoid multiple passthroughs as soon as we start adding methods that might trigger computations during the optimization step (see #181)
Currently we're doing computation in the optimize call. I wonder if instead we should have another Expression. This would replace the Literal
value with something that would return a graph with a single task.
Just a thought, not a strong one.
Now that we have a release out, it'd be nice to release on conda-forge too (@mmccarty mentioned this on the monthly community meeting)
A relatively common operation is something like the following:
df.drop_duplicates(subset=["a"]).b
We can optimise this to something like
df[["a", "b"]].drop_duplicates(subset=["a"]).b
Doing this with our Projection class would cause a RecursionError though, since we would try to push b up over and over again. Thoughts on how we can solve this?
We have many simple operations like add, mul, assign, astype, filter, etc.. that were simple and easy to add. This part of the codebase seems pretty stable right now.
There are probably many more operations like these we could bring over without much uncertainty. I propose that we do this, but by having someone else do it, anyone who isn't @rjzamora and @mrocklin . This would be a good first issue.
Some example operations:
I ran the benchmarks yesterday. Many of them are still failing because of #159
Link to the results: https://github.com/coiled/benchmarks/actions/runs/5313713615
A new run is here: https://github.com/coiled/benchmarks/actions/runs/5957150408
It looks like we are consistently a little bit slower on shuffle based workloads, I wouldn't look into this right now. The h2o results look mostly good, filter is a bit weird, but this could be because of some time series problem. Will look into that one.
There are a few issues that motivate the desire to have a separate encapulsating class
class DataFrame:
expr: Expr
class Series:
expr: Expr
...
Those issues are as follows:
For cases like the following:
df["z"] = df.x + df.y
We need to be able to do something like the following:
def __setitem__(self, column, value):
expr = self.expr.assign(column=value)
self.expr = expr # we need to be able to do this for mutation
This requires that the object the user has needs to be able to have all of its metadata rewritten, and the underlying operation needs to be able to be changed. Relying on types for the operation means that this is a no-go with the current single class architecture.
DataFrame, Series, and Index have different APIs that we'll need to present to the user. The easiest way to do this is with new classes. However if we're also using classes for the operation (Sum, Add, Join) then we enter n by k
subclass hell. Composition handles this fine.
Expressions have various attributes like .frame
, .left
, .right
, which we probably don't want to present to the user because it'll end up confusing things with column attributes and so on. We probably want to hide these a bit. With the proposed architecture these become df.expr.left
which is appropriately hidden. We'll always have the parameters take precedence over anything else, and this will probably be fine because users won't touch this layer by default.
I haven't done this yet just because it adds a bit of development inertia (have to add two methods for everything, or else drop syntactic sugar for Exprs).
I was writing down some tests to help me think through swapping filters and merges
def test_filter_merge():
a = pd.DataFrame({"a": range(5), "b": range(5), "c": range(5)})
b = pd.DataFrame({"c": [0, 2, 4, 6, 8], "x": range(5), "y": range(5)})
a = dx.from_pandas(a)
b = dx.from_pandas(b)
# Some simple cases
df = a.merge(b)
df = df[df.x > 3]
bb = b[b.x > 3]
expected = a.merge(bb)
assert df.optimize()._name == expected.optimize()._name
df = a.merge(b)
df = df[df.b > 3]
aa = a[a.b > 3]
expected = aa.merge(b)
assert df.optimize()._name == expected.optimize()._name
# Apply to both!
df = a.merge(b)
df = df[df.c > 3]
aa = a[a.c > 3]
bb = b[b.c > 3]
expected = aa.merge(bb)
assert df.optimize()._name == expected.optimize()._name
# Works with more complex expressions, and multiple columns
df = a.merge(b)
df = df[df.a > df.b + 1]
aa = a[a.a > a.b + 1]
expected = aa.merge(b)
assert df.optimize()._name == expected.optimize()._name
# Only apply if all columns are in the table, not if only some are
df = a.merge(b)
df = df[df.c > df.x + 1]
bb = b[b.c > b.x + 1]
expected = a.merge(bb)
assert df.optimize()._name == expected.optimize()._name
# Bail if we engage non-elemwise expressions in the predicates
df = a.merge(b)
df = df[df.x > df.y.sum()]
bb = b[b.x > b.y.sum()]
not_expected = a.merge(bb)
assert df.optimize()._name != not_expected.optimize()._name
I think that what I came to is that we want to push a filter into one side or the other if
predicate.subs({merged: left})
doesn't raise an error (after we figure out renaming of columns).df.x < df.y.sum()
for example)And so maybe an implementation here relies on ...
left
that has no expression history, but does have the same ._meta
substitute
and seeing if it errsElemwise
I think that that should cover most cases and be various robust.
Things get a bit more complex if we also want to support And
clauses (there are cases where we could split a predicate into two parts and each part applies to one side) but I think that this is probably pretty rare and not important.
I don't plan to do this work, but it was interesting to think through. I suspect that at some point we'll run into this problem, either in a benchmark or in practice and in that case I hope that the tests and discussion above help a little.
Maybe it's time for this to stop living in my personal github account
I exchange a couple of messages with Rick on this topic yesterday.
Our optimization logic has a weakness when it comes to reading data (the issue extends to other methods as well, but let's focus on I/O to keep it a bit simpler). We tend to read data multiple times when we have different branches in our tree from the same root. One example:
df = read_parquet("taxi.parquet")
df["tipped"] = df.tips != 0
df["bpf_valid"] = df.base_passenger_fare != 0
q = df[["tipped", "bpf_valid", "tips"]].sum()
tips
is read twice in this case. Once for the NE
expression that is used for assignment and once more for the sum aggregation at the end.
This is a problem that get's worse when the query gets more complicated. It's triggered if one operation operates on 2 DataFrames from the same root.
Ideally, we would only read the data once.
read_parquet(path, columns=["base_passenger_fare", "tips"], ...)["tips"]
There might be reasons not to do this, e.g. memory pressure, but this is probably only an option when we are not reading from remote storage. Our current optimization logic does not provide a mechanism to address this.
This isn't first-order priority but we should address this soonish.
This extends to multiple operations as well:
df = read_parquet("taxi.parquet")
df = df.replace(1, 100).fillna(100)
df["tipped"] = df.tips != 0
df["bpf_valid"] = df.base_passenger_fare != 0
q = df[["tipped", "bpf_valid", "tips"]].sum()
We will apply replace
and fillna
to tips
twice as well.
cc @rjzamora
I spent some time thinking about #166, #181 and #186 yesterday, and experimented a bit with optimization ordering. I played with some relatively complex approaches, but ultimately decided to propose something pretty simple.
General Idea: We add an optional phases: tuple
argument to simplify
, move the graph-traversal logic into a _simplify
method, and add an allow_group
argument to _simplify
, _simplify_up
, and simplify_down
.
Rough Code Outline:
class AbstractExpr(Expr):
# "Example" Expr class
_simplified = None # Use attribute for hashing (can be done in separate work)
def _simpify_down(self, allow_group: tuple):
# Hypothetical example
if "repartition" in allow_group:
# Hold off on "lowering" until "repartition" phase
return RepartitionDivisions(...)
def _simpify_up(self, parent, allow_group: tuple):
# Probably don't need to check allow_group for projection
if isinstance(parent, Projection):
...
def simplify(self, phases=("abstract", "lower", "repartition")):
# NOTE: Could always add/change phases later on
# Loop over all "phases"
expr = self
for p in range(len(phases)):
allow_group = phases[:p+1]
# For each phase, we do the usual "simplify" loop
# calling _simplify_down, _simplify_up and _simplify
# until nothing changes. We pass in `allow_group`
# to enforce rough ordering of optimizations.
...
return expr
def _simplify(self, allow_group: tuple):
# Use chached result if available
if (self._simplified or {}).get(allow_group) is not None:
return self._simplified
# Usual "simplify" loop here, but `allow_group`
# is passed to `_simplify_up` and `_simplify_down`
expr = self
...
# Cache result
self._simplified = {allow_group: expr}
return expr
We currently store annotations on high level layers. We'll probably want to do the same thing with Exprs now. Some complications:
I don't plan to think about this too much until other questions are answered. This doesn't seem as critical.
It's probably time to implement CI. We can probably start lightweight, maybe just one environment (linux, Python 3.10 or something) running on GitHub Actions or whatever the normal thing is these days.
Not sure what's going on here. Not a huge deal though:
In [1]: from dask_expr.datasets import timeseries
In [2]: df = timeseries()
In [3]: df.head()
Out[3]:
name id x y
timestamp
2000-01-01 00:00:00 Ingrid 1023 -0.745347 -0.032617
2000-01-01 00:00:01 Oliver 973 -0.280432 0.715767
2000-01-01 00:00:02 Sarah 1008 0.629102 0.581916
2000-01-01 00:00:03 Michael 996 -0.112556 0.247828
2000-01-01 00:00:04 Ingrid 1058 -0.439967 0.958142
In [4]: df.x in df.columns
Out[4]: False
In [5]: df.expr.x in df.expr.columns
Out[5]: True
This would be a good first issue. Probably folks would want to look at Head
@rjzamora pointed me at this, and I had some thoughts around the term rewriting aspects.
peephole-based rewrite systems like matchpy are quite powerful, but do suffer from some flaws if local rewrites aren't always sending you downhill in the cost function landscape. To enumerate all possible expressions and pick the best one with backtracking is exponential in the number of rewrites and size of the expression.
One also needs to be a little bit careful applying all rules that one doesn't have pairs of rules that undo each other's work.
An alternate approach, that may be too heavy-weight in this instance, is equality saturation using e-graphs. These are a compact datastructure for representing all combinations of rewritten terms at once (polynomial rather than exponential). I like the egg library which comes with a (rather underdocumented) python interface via snake-egg. Probably there is no need to consider it at the moment (since exploring the landscape of what the expression system looks like is more difficult than mechanically translating from one rewriting system to another).
As discussed in an older Dask issue (dask/dask#6718), GPU objects do not produce a deterministic hash within tokenize
. This is a problem for dask-expr, because we assume that an expression can be uniquely identified by the result of tokenize(*operands)
(where operands
may include GPU objects). While experimenting with this, I also discovered that some pyarrow objects (e.g. pa.Table
) also produce a non-deterministic token.
In the long term, this can be resolved upstream in cudf
by implementing deterministic object hashing. We can also leverage the normalize_token
dispatch to convert cudf objects into a smaller (but representative) tuple of pandas/numpy objects:
from dask.base import normalize_token
import cudf
@normalize_token.register((cudf.DataFrame, cudf.Series, cudf.Index))
def _tokenize_cudf_object(obj):
# Convert cudf objects to representative pd/np data
return (
type(obj), # Type info
(
obj.to_frame()
if not isinstance(obj, cudf.DataFrame)
else obj
).dtypes, # Schema info
(
obj.to_frame()
if isinstance(obj, cudf.Index)
else obj.reset_index()
).hash_values().to_numpy(), # Hashed rows
)
Although this kind of solution requires device -> host data movement to convert the hashed-row series into numpy, the amount of data that needs to move will be minimal in performance-sensitive cases, because users should not be using an API like from_pandas
.
Overall Question: "How do we want to deal with non-deterministic tokenization in general?" That is, will we rely on normalize_token
dispatching? If so, should we add the dispatch implementations in dask/dask, or should we provide a space in dask-expr?
While working through #1 and #2, I ran into some problems with the behavior of __getattr__
, which currently looks like this:
def __getattr__(self, key):
if key == "__name__":
return object.__getattribute__(self, key)
elif key in type(self)._parameters:
idx = type(self)._parameters.index(key)
return self.operands[idx]
elif key in dir(type(self)):
return object.__getattribute__(self, key)
elif is_dataframe_like(self._meta) and key in self._meta.columns:
return self[key]
else:
return object.__getattribute__(self, key)
The problem is that we currently treat values in _parameters
as high-priority attributes of the object, and have no mechanism to deal with the fact that there may be overlap between those parameter names and other attribute keys or column names. I don't think this behavior is maintainable.
I propose that we require a distinct syntax to access parameter values (e.g. self.parameter()
or self.operand()
). I also don't think it is necessary to provide a public API for setting a parameter value (currently provided by __setattr__
).
I (and others) often use this example when giving Dask dataframe examples: https://github.com/coiled/examples/blob/main/uber-lyft.ipynb
I tried running it and, to my surprise, some things actually worked! Not everything though, in particular:
df.groupby("hvfhs_license_num").tipped.mean().compute()
didn't seem to workAttributeError Traceback (most recent call last)
Cell In[11], line 3
1 df["tipped"] = df.tips != 0
----> 3 df.groupby("hvfhs_license_num").tipped.mean().compute()
AttributeError: 'GroupBy' object has no attribute 'tipped'
My thinking is that by going after common workflows like this we'll find ourselves with a good objective function to help us prioritize.
This should be a good first issue for someone who isn't @rjzamora or @mrocklin . Some not-yet-existing-but-easy-reductions
from dask_expr.datasets import timeseries
df = timeseries()
df = df.sort_values("x")
df["z"] = df.x + df.y
The output is a little strange here. I'm not yet sure why.
AttributeError Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
151 try:
--> 152 return object.__getattribute__(self, key)
153 except AttributeError as err:
AttributeError: 'SortValues' object has no attribute 'user_divisions'
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
151 try:
--> 152 return object.__getattribute__(self, key)
153 except AttributeError as err:
File ~/mambaforge/envs/dask-expr/lib/python3.11/functools.py:1001, in cached_property.__get__(self, instance, owner)
1000 if val is _NOT_FOUND:
-> 1001 val = self.func(instance)
1002 try:
File ~/workspace/dask-expr/dask_expr/_expr.py:696, in Expr.divisions(self)
694 @functools.cached_property
695 def divisions(self):
--> 696 return tuple(self._divisions())
File ~/workspace/dask-expr/dask_expr/_shuffle.py:627, in BaseSetIndexSortValues._divisions(self)
626 def _divisions(self):
--> 627 if self.user_divisions is not None:
628 return self.user_divisions
File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
174 f"{err}\n\n"
175 "This often means that you are attempting to use an unsupported "
176 f"API function. Current API coverage is documented here: {link}."
177 )
AttributeError: 'SortValues' object has no attribute 'user_divisions'
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
151 try:
--> 152 return object.__getattribute__(self, key)
153 except AttributeError as err:
File ~/mambaforge/envs/dask-expr/lib/python3.11/functools.py:1001, in cached_property.__get__(self, instance, owner)
1000 if val is _NOT_FOUND:
-> 1001 val = self.func(instance)
1002 try:
File ~/workspace/dask-expr/dask_expr/_expr.py:696, in Expr.divisions(self)
694 @functools.cached_property
695 def divisions(self):
--> 696 return tuple(self._divisions())
File ~/workspace/dask-expr/dask_expr/_expr.py:1668, in Projection._divisions(self)
1667 return (None, None)
-> 1668 return super()._divisions()
File ~/workspace/dask-expr/dask_expr/_expr.py:1061, in Blockwise._divisions(self)
1060 if not self._broadcast_dep(arg):
-> 1061 assert arg.divisions == dependencies[0].divisions
1062 return dependencies[0].divisions
File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
174 f"{err}\n\n"
175 "This often means that you are attempting to use an unsupported "
176 f"API function. Current API coverage is documented here: {link}."
177 )
AttributeError: 'SortValues' object has no attribute 'user_divisions'
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
File ~/workspace/dask-expr/dask_expr/_expr.py:152, in Expr.__getattr__(self, key)
151 try:
--> 152 return object.__getattribute__(self, key)
153 except AttributeError as err:
File ~/workspace/dask-expr/dask_expr/_expr.py:712, in Expr.npartitions(self)
711 else:
--> 712 return len(self.divisions) - 1
File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
174 f"{err}\n\n"
175 "This often means that you are attempting to use an unsupported "
176 f"API function. Current API coverage is documented here: {link}."
177 )
AttributeError: 'SortValues' object has no attribute 'user_divisions'
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
During handling of the above exception, another exception occurred:
AttributeError Traceback (most recent call last)
Cell In[1], line 4
2 df = timeseries()
3 df = df.sort_values("x")
----> 4 df["z"] = df.x + df.y
File ~/workspace/dask-expr/dask_expr/_collection.py:79, in _wrap_expr_op(self, other, op)
77 if not isinstance(other, expr.Expr):
78 return new_collection(getattr(self.expr, op)(other))
---> 79 elif expr.are_co_aligned(self.expr, other):
80 return new_collection(getattr(self.expr, op)(other))
81 else:
File ~/workspace/dask-expr/dask_expr/_expr.py:2187, in are_co_aligned(*exprs)
2185 def are_co_aligned(*exprs):
2186 """Do inputs come from different parents, modulo blockwise?"""
-> 2187 exprs = [expr for expr in exprs if not is_broadcastable(expr)]
2188 ancestors = [set(non_blockwise_ancestors(e)) for e in exprs]
2189 unique_ancestors = {
2190 # Account for column projection within IO expressions
2191 _tokenize_partial(item, ["columns", "_series"])
2192 for item in flatten(ancestors, container=set)
2193 }
File ~/workspace/dask-expr/dask_expr/_expr.py:2187, in <listcomp>(.0)
2185 def are_co_aligned(*exprs):
2186 """Do inputs come from different parents, modulo blockwise?"""
-> 2187 exprs = [expr for expr in exprs if not is_broadcastable(expr)]
2188 ancestors = [set(non_blockwise_ancestors(e)) for e in exprs]
2189 unique_ancestors = {
2190 # Account for column projection within IO expressions
2191 _tokenize_partial(item, ["columns", "_series"])
2192 for item in flatten(ancestors, container=set)
2193 }
File ~/workspace/dask-expr/dask_expr/_expr.py:2168, in is_broadcastable(s)
2163 def is_broadcastable(s):
2164 """
2165 This Series is broadcastable against another dataframe in the sequence
2166 """
-> 2168 return s.ndim == 1 and s.npartitions == 1 and s.known_divisions or s.ndim == 0
File ~/workspace/dask-expr/dask_expr/_expr.py:173, in Expr.__getattr__(self, key)
170 return self[key]
172 link = "https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage"
--> 173 raise AttributeError(
174 f"{err}\n\n"
175 "This often means that you are attempting to use an unsupported "
176 f"API function. Current API coverage is documented here: {link}."
177 )
AttributeError: 'SortValues' object has no attribute 'user_divisions'
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
This often means that you are attempting to use an unsupported API function. Current API coverage is documented here: https://github.com/dask-contrib/dask-expr/blob/main/README.md#api-coverage.
partition_size
set_index
assign
with callableGroupBy.agg
shuffle
keywordread_csv
Not planning on fixing all of them immediately, just collecting failures and fixing what was on my agenda anyway or is a very small fix.
Reference build: https://github.com/coiled/benchmarks/actions/runs/5310108176/jobs/9611644148?pr=837
There has been pain around graph culling in the past. We want to be efficient in cases where people ask for only a small subset of the output result, and that requires only a small subset of the graph to be generated. This occurs in situations like .head
and array slicing.
My hope is that a lot of this pain goes away with high level optimization. For example we should have optimizations that push .head
through many operations (everything except IO, shuffling, ACA, etc..). Even then there are a couple of options:
Regardless, a good first step here, I think, is to build some optimizations around Head
This is one of my standard examples, and it's quite sad today
import dask.dataframe as dd
import dask_expr as dx
df = dx.read_parquet(
"s3://coiled-datasets/uber-lyft-tlc/",
)
df["tipped"] = df.tips != 0
df.groupby(df.hvfhs_license_num).tipped.mean()
It looks like this second cell triggers computation when it should be instantaneous. I suspect the parquet lengths stuff. cc @rjzamora
Meta has dtype float64 in the following example since it takes the float column into account
df = pd.DataFrame({"a": [11, 12, 31], "b": 1.5})
ddf = from_pandas(df, npartitions=1)
result = ddf.prod()["a"]
result._meta
result.compute()
The result has dtype int64 since we dropped the float column before computing the mean. Not sure if/when we can address this, but wanted to keep track, so opening this issue
If we're looking to have a realistic workload then my guess is that the read_parquet -> shuffle -> to_parquet
pipeline seems like a good first one. @rjzamora maybe something to think about?
import dask_expr
from distributed import Client
c = Client()
df = dask_expr.datasets.timeseries()
df2 = dask_expr.datasets.timeseries()
sum_x = (
df
.merge(df2, on=["name", "id"])
.x_x.sum()
)
print(sum_x.optimize().tree_repr())
Sum(TreeReduce): split_every=0
Fused(8cf79):
| Sum(Chunk):
| Projection: columns='x_x'
| BlockwiseMerge: left_on=['name', 'id'] right_on=['name', 'id']
| Projection: columns=['x', 'name', 'id']
P2PShuffle: partitioning_index='_partitions' npartitions_out=30 ignore_index=False options=None
Fused(d0cfd):
| AssignPartitioningIndex: partitioning_index=['name', 'id'] index_name='_partitions' npartitions_out=30
| Timeseries: dtypes={'x': <class 'float'>, 'name': 'string', 'id': <class 'int'>} seed=1132422555
| Projection: columns=['name', 'id', 'x']
P2PShuffle: partitioning_index='_partitions' npartitions_out=30 ignore_index=False options=None
Fused(f9ad4):
| AssignPartitioningIndex: partitioning_index=['name', 'id'] index_name='_partitions' npartitions_out=30
| Timeseries: dtypes={'name': 'string', 'id': <class 'int'>, 'x': <class 'float'>} seed=946678847
is using a BlockwiseMerge
that acts on two P2PShuffle
but instead it should be using a HashJoin
, see dask/distributed#7514 dask/distributed#7496
(shortcoming of scheduler heuristics will not schedule two p2ps efficiently; other binary ops can be affected similarly)
We have the ACA structure for this. There's now a lot of grunt work to bring over the algorithms.
Also! We need to figure out how we want to represent the intermediate Groupby object. That should be interesting.
Eventually we'll have all sorts of Exprs like arrays, bags, etc.. Today we're mostly focused on dataframe and that's good I think.
However, even today we have non-DataFrame exprs like scalars, and the results of to_parquet
calls. Maybe it's time to pull out some of the dataframe metadata assumptions like divisions
and maybe even meta
from the Expr
class and make some new Frame
class (or some better name) from which dataframe-like-things (dataframe, series, index) inherit.
import pandas as pd
from dask.dataframe import assert_eq
from dask_expr import from_pandas
df = pd.DataFrame({"a": [1, 2, 3], "bb": 1})
ddf = from_pandas(df)
assert_eq(ddf.max()["a"], df.max()["a"])
raises
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 134, in __getattr__
return object.__getattribute__(self, key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/expr.py", line 841, in _meta
return self.frame._meta.index
^^^^^^^^^^^^^^^^
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/reductions.py", line 100, in _meta
meta = self.chunk(meta, **self.chunk_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/PycharmProjects/dask_dev/dask-expr/dask_expr/reductions.py", line 144, in chunk
out = cls.reduction_chunk(df, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/dask/utils.py", line 1105, in __call__
return getattr(__obj, self.method)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/generic.py", line 11646, in max
return NDFrame.max(self, axis, skipna, numeric_only, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/generic.py", line 11185, in max
return self._stat_function(
^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/generic.py", line 11158, in _stat_function
return self._reduce(
^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/series.py", line 4666, in _reduce
return op(delegate, skipna=skipna, **kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 158, in f
result = alt(values, axis=axis, skipna=skipna, **kwds)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 421, in new_func
result = func(values, axis=axis, skipna=skipna, mask=mask, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 1083, in reduction
values, mask, dtype, dtype_max, fill_value = _get_values(
^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/nanops.py", line 313, in _get_values
values = extract_array(values, extract_numpy=True) # type: ignore[assignment]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/construction.py", line 441, in extract_array
if isinstance(obj, (ABCIndex, ABCSeries)):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/dtypes/generic.py", line 44, in _instancecheck
return _check(inst) and not isinstance(inst, type)
^^^^^^^^^^^^
File "/Users/patrick/mambaforge/envs/dask-expr/lib/python3.11/site-packages/pandas/core/dtypes/generic.py", line 38, in _check
return getattr(inst, attr, "_typ") in comp
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
RecursionError: maximum recursion depth exceeded in comparison
We would like to send expressions up to the Dask scheduler have them optimized and lowered there and then enter the normal scheduler state. To do this I think that we need a new route on client/scheduler communication. Maybe that can be made more general and accept "something picklable that satisfies the .__dask_foo__
protocols" or something similar.
Eventually I'd love it if we held on to the Expr structure in the scheduler to inform future optimizations, but as a first pass I think that we probably just try to end up with the same state that the current update_graph
route achieves.
@fjetter I think that this would benefit from someone on your team
I looked a bit into how an iloc
implementation could look like (we need this for some of the more complicated reductions to adjust for potentially duplicated columns). I haven't gone done this rabbit hole terribly far, because I wanted to avoid doing something that you might have considered before.
Right now, Projection
is label based, which works nicely but doesn't mesh very well with positional indexes. We could implement another class that does the same for positional column indexers, but this would require special casing every simplify_up
call for both of them (simply because df[positional_indexer]
does not work). If we'd convert the columns given to Projection
into positional indexes, we could mesh both of them together and generalise the _simplify_up
steps.
Also it makes estimating stuff easier, since label based won't tell you the dimensions of your Frame (duplicated columns), but this is a side-note.
I avoided this in the initial implementation. This is not urgent in any way, just to keep track of
I had two conversations recently with @fjetter and @phofl about reasoning about dataframes. Often during different stages (expression building, optimization, task graph building) we want to know various attributes of our dataframe. Here are a few things people have asked for in the past:
Historically we relied on divisions
to answer many of these questions, and we're still used to doing that. However, this isn't great, because divisions can change as we optimize (maybe we decide to repartition based on worker size, for example) and because divisions doesn't hold all of the infomation we want (like lengths of partitions).
Instead, I think that we need to get used to asking questions of expressions by either inspecting the current tree, or by using other expressions. Two examples:
@rjzamora added a Lengths
expression, which computes the length of each partition, and knows how to optimize itself through Elemwise and ReadParquet calls so that, oftentimes, we can convert a Lengths(df)
expression into a Literal
.
This is an example of using an expression to answer a question about an existing expression
We could ask if two dataframes share the same partition alignment by asking if they have the same ancestors modulo Blockwise
. For example the following are co-aligned:
df = read_parquet(...)
a = df.x[df.z > 1] + 10
b = df.y - df.z.sum()
assert are_co_aligned(a, b)
This is an example of looking at an expression tree to answer a question.
Similar to above we might infer that a column is sorted if it had previously been the target of a set_index
or sort_values
call, and was only operated on by monotonic expressions (we'd have to add this to known monotonic expressions)
General principles:
Dask-match certainly makes most high-level optimizations simpler than they would be with Dask's current HLG design. However, it is not yet clear to me how task fusion should come into play. There seem to be three general paths forward:
dask-match
to enable the fusion of "blockwise" parent and child expressions._layer
methods to output Blockwise
HLG layers instead of low-level dictionaries.I like that (1) allows use to keep the logic in dask-match
a bit simpler, but I'm not very confident that this approach will deliver the same benefits as (2) or (3). (3) is clearly the "fastest" way to support task fusion, but we will eventually need to migrate the necessary subgraph-callable logic into the Expr
version of Blockwise
if the long-term plan is to scrap HLGs altogether.
We're working to fill out the DataFrame API here. Several important operations work, but there's still a lot of API surface not covered. One option would be to fall back to the existing dask.dataframe
implementation when a dask-expr
method doesn't exist.
This is just one possible option. I don't think we should focus on this at this stage. Just logging the idea.
Is this expected behavior?
Repro:
import dask_expr as dx
df = dx.datasets.timeseries(dtypes={"name": "string", "id": int, "quantity": int, "spend": float})
df["quantity"].std()
Stacktrace:
Traceback (most recent call last):
File "lib/python3.9/site-packages/dask_expr/_collection.py", line 175, in __getattr__
val = getattr(self.expr, key)
File "lib/python3.9/site-packages/dask_expr/_expr.py", line 148, in __getattr__
raise err
File "lib/python3.9/site-packages/dask_expr/_expr.py", line 137, in __getattr__
return object.__getattribute__(self, key)
AttributeError: 'Projection' object has no attribute 'std'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "lib/python3.9/site-packages/dask_expr/_collection.py", line 181, in __getattr__
raise err
File "lib/python3.9/site-packages/dask_expr/_collection.py", line 170, in __getattr__
return object.__getattribute__(self, key)
AttributeError: 'Series' object has no attribute 'std'
They somehow pull a lot of data onto one worker.
test_q3 from coiled benchmarks is a reproducer, simpler:
uri = "s3://coiled-datasets/h2o-benchmark/N_1e8_K_1e2_parquet/*.parquet"
ddf = read_parquet(uri, engine="pyarrow", storage_options={"anon": True})
ddf = ddf[["id3", "v1", "v3"]]
(
ddf.groupby("id3", dropna=False, observed=True)[["v1"]].sum()
#.agg({"v1": "sum", "v3": "mean"})
.compute()
)
column selection and aggregation works but takes forever, using agg kills the worker. 30 workers (which is way more than we'd need normally)
I used the divisions
to identify whether or not 2 frames are aligned if this was necessary for a given operation.
A different approach, that is potentially cheaper:
Both frames are aligned as long as we can find a common root for both of them and we only pass through Blockwise operations while going through the expression tree. This can be used as a short-cut to figure out whether or not 2 frames are aligned without accessing any metadata.
Other cases will still require a repartitioning call.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.