Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
Caching based on computation time and storage space
License: BSD 3-Clause "New" or "Revised" License
Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
@jrbourbeau and I are in the process of moving the default branch for this repo from master to main.
Once the name on github is changed (the first box above is Xed, or this issue closed), when you try to git pull
you'll get
Your configuration specifies to merge with the ref 'refs/heads/master'
from the remote, but no such ref was fetched.
First: head to your fork and rename the default branch there
Then:
git branch -m master main
git fetch origin
git branch -u origin/main main
Food for thought. Most of these are really "Pandas things" and not "Dask things", but I'm kind of riffing here.
For a DataFrame, what if we cached
For a dask bag, what if we cached
bag.filter(lambda x: x > 2).map(lambda x: x + 1)
, dask can cache the filter and the map in case either is ever called again with that particular lambda. The following could be a terrible idea. Still riffing.:import inspect
import hashlib
m = hashlib.md5()
foo = lambda x: x > 2
stringified = inspect.getsource(foo)
m.update(stringified)
m.digest()
Now you have a checksum on that particular lambda function that you can use as a key for that calc on that bag.
For a Dask DataFrame, what if we cached
apply
or map
s on the DataFrame (which tend to be terrible expensive). Same hashing of the function they're using for the apply or map, then caching of the apply
ed or map
ed columns or the entire Frame.In general, I think there could be huge wins in essentially taking snapshots of a given series of computations on any given sequence or DataFrame, and being able to reconstruct those from caches of both intermediates and final results.
This actually could actually provide some compelling data flows on the distributed side of dask. If any given computation node fails, you could dramatically decrease the time it takes to re-run a given sequence of computations if you had all of those intermediate steps cached along the way. So for any given branch of the graph, if you've finished 5 of the 6 pieces of computation and the node fails, either you can wait for the node to stand back up and just finish step 6, or you can shuffle the intermediate cached result from step 5 to another node and finish step 6. Moving that intermediate is going to be way less expensive that moving the whole data block. This does, of course, assume you get the data from the failed node into storage where a new node can access it.
End of riff.
The current behavior seems to be replacing the value of the key, but increases total_bytes without retiring the existing item.
I do not think this behavior is well defined. It causes total_bytes and available_bytes to diverge, and eventually triggers #15.
How unique do we expect the keys to be?
Shall we raise an exception if the key same key is used twice, or shall we consistently retire the existing key/value when the same key is reused?
How can I specify a cache file/location?
When attempting to groupby two or more keys, I get an attribute error. This does not occur when grouping by a single key
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-11-c4a4f7e033ab> in <module>
----> 1 df.groupby(['path','time']).alt.mean().compute()
~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(self, **kwargs)
154 dask.base.compute
155 """
--> 156 (result,) = compute(self, traverse=False, **kwargs)
157 return result
158
~/anaconda3/lib/python3.6/site-packages/dask/base.py in compute(*args, **kwargs)
396 keys = [x.__dask_keys__() for x in collections]
397 postcomputes = [x.__dask_postcompute__() for x in collections]
--> 398 results = schedule(dsk, keys, **kwargs)
399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
400
~/anaconda3/lib/python3.6/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
74 results = get_async(pool.apply_async, len(pool._pool), dsk, result,
75 cache=cache, get_id=_thread_get_id,
---> 76 pack_exception=pack_exception, **kwargs)
77
78 # Cleanup pools associated to dead threads
~/anaconda3/lib/python3.6/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
465 finish_task(dsk, key, state, results, keyorder.get)
466 for f in posttask_cbs:
--> 467 f(key, res, dsk, state, worker_id)
468
469 while state['ready'] and len(state['running']) < num_workers:
~/anaconda3/lib/python3.6/site-packages/dask/cache.py in _posttask(self, key, value, dsk, state, id)
59 duration += max(self.durations.get(k, 0) for k in deps)
60 self.durations[key] = duration
---> 61 nb = self._nbytes(value) + overhead + sys.getsizeof(key) * 4
62 self.cache.put(key, value, cost=duration / nb / 1e9, nbytes=nb)
63
~/anaconda3/lib/python3.6/site-packages/cachey/nbytes.py in nbytes(o)
27
28 if name == 'pandas.core.series.Series':
---> 29 return _array(o._data.blocks[0].values) + _array(o.index._data)
30 elif name == 'pandas.core.frame.DataFrame':
31 return _array(o.index) + sum([_array(blk.values)
~/anaconda3/lib/python3.6/site-packages/cachey/nbytes.py in _array(x)
3
4 def _array(x):
----> 5 if x.dtype == 'O':
6 return sys.getsizeof('0'*100) * x.size
7 elif str(x.dtype) == 'category':
AttributeError: 'NoneType' object has no attribute 'dtype'
Is this a known problem? Or a bug?
Hello Fellows,
I am looking at using dask for distributed computation and I was wondering what is your strategy to make cachey into horizontally scalable cache and common calculation space?
Regards,
Alex
Due to changes in the Travis CI billing, the Dask org is migrating CI to GitHub Actions.
This repo contains a .travis.yml
file which needs to be replaced with an equivalent .github/workflows/ci.yml
file.
See dask/community#107 for more details.
I've just started to use cachey (its quite cool, so thanks for making it). I'm curious if it would be possible to extend cache beyond in-memory caching, perhaps using Zict or similar mutable mappings.
Currently Cache.data
is just a vanilla dict
:
Line 71 in 382e55e
and I'm wondering if it could be any dict-like thing. A little monkey-patch exploration makes this seem possible. Here I replace Cache.data
with Zict's LRU
mapping:
In [1]: >>> from cachey import Cache
...:
...: >>> c = Cache(1e9, 1) # 1 GB, cut off anything with cost 1 or less
In [2]: c.data
Out[2]: {}
In [3]: import pickle
...: import zlib
...:
...: from zict import File, Func, LRU
...:
...: a = File('myfile/', mode='a')
...: b = Func(zlib.compress, zlib.decompress, a)
...: c = Func(pickle.dumps, pickle.loads, b)
...: d = LRU(100, c)
In [4]: d
Out[4]: <LRU: 0/100 on <Func: dumps<->loads <Func: compress<->decompress <File: myfile/, mode="a", 0 elements>>>>
In [5]: c = Cache(1e9)
# monkey patch here!
In [6]: c.data = d
In [7]: c
Out[7]: <cachey.cache.Cache at 0x7fc7c05ed6d8>
In [8]: c.data
Out[8]: <LRU: 0/100 on <Func: dumps<->loads <Func: compress<->decompress <File: myfile/, mode="a", 0 elements>>>>
In [9]: c.put('x', 'some value', cost=3)
In [10]: c.get('x')
Out[10]: 'some value'
Thoughts on what I've done here? What I'm really going for is a Zict.Buffer
with a fast dict and a slower datastore (database or object store).
Hello,
I have found this project very useful and while I was doing tests I have found this behavior unexpected for me.
I leave below a minimum reproducible example:
import pandas as pd
import cachey
df = pd.DataFrame([[1, 2]], columns=list("ab"))
c = cachey.Cache(1e9)
@c.memoize
def boo(df):
print("not cached!")
return df, df.to_dict()
boo(df=df) # not works
boo(df) # works
boo(df) # works
I don't know if this comment is appropriate but I thought it was interesting, maybe you can give me a clue as to what could cause this problem.
cross-posting from ipycache:
This is somewhat related to both #5 and #3 but slightly different. Basically I would like a persistent cache for use when working in a jupyter notebook. The motivation is very similar to https://github.com/rossant/ipycache i.e. if I restart a notebook I don't want to have to repeat any computations that previously finished. However I would like to use Zarr to store cache results not pickle because compression will save disk space. Also I would like to use a memoize function decorator rather than a cell magic, i.e., something more like the cachey memoize decorator and the joblib Memory.cache decorator.
No problem if this is beyond scope for cachey but I thought I'd mention it in case there were any synergies with other requirements. On the technical side there are two main points to consider: one is how to generate a key from function arguments that is stable across python sessions (i.e., doesn't rely on Python's built-in hash function); the second is how to integrate with Zarr (or similar) for storage.
Cachey produces an unhelpful div/zero error when a cached function returns a zero-length array.
Examples:
from cachey import Cache
import numpy as np
import pandas as pd
cache = Cache(1e5)
@cache.memoize
def myfunc(x):
return x
try:
myfunc(pd.Series())
except ZeroDivisionError as m:
print(m)
try:
myfunc(np.array([]))
except ZeroDivisionError as m:
print(m)
myfunc([]) #lists work fine
float division by zero
float division by zero
Out[39]:
[]
Hello,
I'm having troubles during the installation,
kepler@nebula:~$ sudo pip install cachey
[sudo] password for kepler:
Downloading/unpacking cachey
Downloading cachey-0.0.1.tar.gz
Running setup.py (path:/tmp/pip-build-Kfuq02/cachey/setup.py) egg_info for package cachey
Traceback (most recent call last):
File "<string>", line 17, in <module>
File "/tmp/pip-build-Kfuq02/cachey/setup.py", line 15, in <module>
install_requires=list(open('requirements.txt').read().strip().split('\n')),
IOError: [Errno 2] No such file or directory: 'requirements.txt'
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 17, in <module>
File "/tmp/pip-build-Kfuq02/cachey/setup.py", line 15, in <module>
install_requires=list(open('requirements.txt').read().strip().split('\n')),
IOError: [Errno 2] No such file or directory: 'requirements.txt'
Thank you.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.