Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
License: BSD 3-Clause "New" or "Revised" License
Dask is a flexible parallel computing library for analytics. See documentation for more information.
New BSD. See License File.
Currently the stepsize computation in proximal_grad
is still embedded within the code, which makes it hard to manage it with Dask. If the proximal_grad
method is useful then it would be nice to pull this function out.
cc @moody-marlin
Code:
from dask_ml.linear_model import LinearRegression as DLR
from dask_ml.datasets import make_regression
X, y = make_regression(n_samples = 500_000, n_features = 60, chunks=10000)
dask_model = DLR(fit_intercept=False)
DLR.fit(X, y)
anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/dask_ml/linear_model/glm.py in fit(self, X, y)
181 self : objectj
182 """
--> 183 X = self._check_array(X)
184
185 solver_kwargs = self._get_solver_kwargs()
AttributeError: 'Array' object has no attribute '_check_array'
package versions
dask 2.6.0 py_0 conda-forge
dask-core 2.6.0 py_0 conda-forge
dask-glm 0.2.0 py_1 conda-forge
dask-jobqueue 0.6.3 py_0 conda-forge
dask-ml 1.0.0 py_1 conda-forge
Hi all,
I am a second year PhD student in mechanical engineering at UC Davis studying numeric methods for optimal control. I just learned about dask and I'm excited about using it for the types of problems I do in my research. I think it might already, or be very close to, be a really great way to increase performance for common numeric methods in scientific computing.
In general, it seems that if an implementation of a method that maximizes the number of "sequential" computations between control flow structures and using concurrent futures for control flows (like error checks) would get significant speed boosts since the dask scheduler optimizes and distributes the computational graph. For example, if one were to daskify an algorithm like a Runge-Kutta ODE solver and all the derivative callbacks, the scheduler should automatically perform the equivalent of common sub-expression collection and the parallelization of inter-step computations. Then dask-based higher level logic (such as comparing the results of multiple simulations) or algorithms (like shooting methods for solving boundary value problems) could add the lower-level computation tasks to the scheduler so computation could be processed in parallel without the user having to choose one level to parallelize at.
This leads to some questions:
Thanks,
Ben
Related issues:
dask/dask#2225
#3
#5
#57
P.S. I'm posting in this repo because more of the related issues that I could find were here; I'd be happy to close this and re-post in another repository if it would be more appropriate. Just let me know.
Fun fact: the current ADMM implementation can spend almost half of it's time computing np.exp
from dask import persist
import dask.array as da
import numpy as np
from dask_glm.logistic import admm
from dask_glm.utils import make_y
N = 1e7
M = 2
chunks = 1e6
seed = 20009
X = da.random.random((N, M), chunks=(chunks, M))
y = make_y(X, beta=np.array(list(range(M))), chunks=chunks)
X, y = persist(X, y)
%%prun
import dask
with dask.set_options(get=dask.get):
beta = admm(X, y)
I use a wrapped version of np.exp
just so that it shows up in profile results.
565331 function calls (532847 primitive calls) in 93.202 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
2254 41.543 0.018 41.543 0.018 utils.py:25(exp)
1127 30.614 0.027 52.086 0.046 logistic.py:349(logistic_loss)
1127 13.122 0.012 40.358 0.036 logistic.py:364(logistic_gradient)
1127 6.267 0.006 27.191 0.024 logistic.py:343(sigmoid)
1130 0.793 0.001 0.793 0.001 {method 'reduce' of 'numpy.ufunc' objects}
8 0.078 0.010 0.079 0.010 executionengine.py:100(finalize_object)
1127 0.052 0.000 40.409 0.036 logistic.py:373(proximal_logistic_gradient)
1127 0.049 0.000 52.135 0.046 logistic.py:358(proximal_logistic_loss)
8 0.043 0.005 0.043 0.005 passmanagers.py:94(run)
30 0.034 0.001 93.040 3.101 lbfgsb.py:205(_minimize_lbfgsb)
2254 0.031 0.000 41.582 0.018 dispatcher.py:152(__call__)
1127 0.029 0.000 92.998 0.083 lbfgsb.py:277(func_and_grad)
26957/12971 0.027 0.000 0.083 0.000 {method 'format' of 'str' objects}
1127 0.021 0.000 52.417 0.047 optimize.py:290(function_wrapper)
80776 0.019 0.000 0.030 0.000 {built-in method isinstance}
1127 0.013 0.000 0.814 0.001 fromnumeric.py:1743(sum)
Here are some comments from @stuartarchibald
How many exp() are you doing at a time?
If you are doing many, are the input values of similar magnitude?
Do you care about IEEE754 correctness over NaN and Inf?
When I've deal with this previously, most of the time spent in a loop over exp() was in a) feraiseexcept or similar dealing with inf/nan b) branch misprediction, the split points are usually f(log_e(2)) IIRC.
a) goes away if you don't care
b) goes away if you can guarantee a range or aren't too bothered about inaccuracy out of range. Remez algs are usually used to compute the coefficient table for a polynomial and a bunch of shifts are done to get values into appropriate range.
c) if you have a load of values to compute, perhaps try Intel VML?
We need to determine a robust testing framework for our logistic regression algorithms, starting with a review of the accuracy of the current ADMM code.
Unregularized Problems
y.sum() = sigmoid(X.dot(beta)).sum()
Regularized Problems
On my local machine, bfgs()
is creating a segmentation fault anytime I try to run it.
import dask.array as da
from dask_glm.utils import make_y
from dask_glm.logistic import bfgs
X = da.random.random((100,2), chunks=(20,2))
y = make_y(X)
bfgs(X, y)
yields:
//anaconda/lib/python3.5/site-packages/dask/array/core.py:464: RuntimeWarning: overflow encountered in true_divide
o = func(*args, **kwargs)
Segmentation fault: 11
Possibly related to dask/dask#1927 ?
On eight m4.2xlarges I created the following dataset
N = 1e8
beta = np.array([-1, 0, 1, 2])
M = 4
chunks = 1e6
seed = 20009
X = da.random.random((N, M), chunks=(chunks, M))
z0 = X.dot(beta)
y = da.random.random(z0.shape, chunks=z0.chunks) < sigmoid(z0)
X, y = persist(X, y)
I then ran the various methods within this project and recorded the profiles as bokeh plots. They are linked to below:
Additionally, I ran against a 10x larger dataset and got the following results
Most runtimes were around a minute. The BFGS solution gave wrong results.
On larger problems with smallish chunks (8 * 4 * 1e6 == 24 MB) we seem to be bound by scheduling overhead. I've created an isolated benchmark here that is representative of this overhead: https://gist.github.com/mrocklin/48b7c4b610db63b2ee816bd387b5a328
Not sure if this link is public: https://readthedocs.org/projects/dask-glm/builds/5741657, but we at least need s3fs. No time to look atm.
I started trying to play with @hussainsultan 's ADMM code. However, it requires many parameters for which I do not know good values. Can we add sane defaults there? Do we know sane defaults?
Short term it would be useful to me to always be able to do the following for all solver routines:
result = func(X, y)
Due to changes in the Travis CI billing, the Dask org is migrating CI to GitHub Actions.
This repo contains a .travis.yml
file which needs to be replaced with an equivalent .github/workflows/ci.yml
file.
See dask/community#107 for more details.
A few shape issues that arose in #2 :
y
arrays of shape (X.shape[0], )
, possibly by simply adding the line y = y[:, None]
before restructuring too much of the code.X, y
arrays that are not split into 5 chunks, but an arbitrary number of chunks / arbitrary chunk distribution.So far all of our solvers are synchronous. They compute full results in lock-step, for example switching between performing a parallel mat-vec, then doing a line search, and then doing a mat-vec again. These algorithms are common on single-machine hardware but may not be ideal for larger clusters.
The distributed scheduler provides some decent capabilities for full asynchronous computing, which may open us up to new algorithms. Are there asynchronous variants to some of these algorithms that may interest us?
Quick example of asynchronous code:
data_futures = client.map(load_chunk, chunks)
params = {...}
futures = client.map(compute_update, random.sample(100, data_futures), **params)
ac = as_completed(futures) # collection of running futures that yield in order of completion
for future in ac:
update_info, score = ac.result()
if is_good(score):
break
update_params(params, update_info)
new_future = client.submit(compute_update, random.choice(data_futures), **params)
ac.add(new_future)
@hussainsultan @mcg1969 @jcrist @moody-marlin
Currently requirements.txt specifies exact versions. This makes it very hard to use dask-glm with other libraries.
We should probably use >=
modifiers instead and choose older versions of libraries if possible.
This is a summary of an e-mail between myself and @MLnick
Taking the simplest version of say logistic regression, the basic idea is to split up the parameter vector (beta) itself into chunks (so it could be a dask array potentially, or some other distributed dask datastructure). Training would then iterate over "mini-batches" using SGD (let's say each mini batch is a chunk of the X array). In each mini batch, the worker will "pull" the latest version of "beta" from the Parameter Server and compute the (local) gradient for the batch. The worker then sends this gradient to the PS, which then performs the update (i.e. update its part of "beta" using say the gradient update from the worker and the step size). The next iteration then proceeds in the same way. This can be sync or async (but typically is either fully async or "bounded stale" async).
The key is to do this effectively as direct communication from the worker doing the mini batch gradient computation, to the worker holding the parameters (the "parameter server"), without involving the master ("client" app) at all, and to only "pull" and "push" the part of beta required for local computation (due to sparsity this doesn't need to be the full beta in many cases). In situations where the data is very sparse (e.g. like the Criteo data) the communication is substantially reduced in this approach. And the model size can be scaled up significantly (e.g. for FMs the model size can be very large).
This is slightly different from the way say L-BFGS works currently (and the way I seem to understand ADMM works in dask-glm) - i.e. that more or less a set of local computations are performed on the distributed data on the workers, and the results collected back to the "master", where an update step is performed (using LBFGS or the averaging of ADMM, respectively). This is also the way Spark does things.
What I'm struggling with is quite how to achieve the PS approach in dask. It seems possible to do it in a few different ways, e.g. perhaps it's possible just using simple distributed dask arrays, or perhaps using "worker_client" and/or Channels. The issue I have is how to let each worker "pull" the latest view of "beta" in each iteration, and how to have each worker "push" its local gradient out to update the "beta" view, without the "master" being involved.
I'm looking into the async work in http://matthewrocklin.com/blog/work/2017/04/19/dask-glm-2 also to see if I can do something similar here.
First, there are two nodes that you might consider the "master", the scheduler and the client. This is somewhat of a deviation from Spark, where they are both in the same spot.
Second, what are your communication and computation requirements? A roundtrip from the client to scheduler to worker to scheduler to client takes around 10ms on a decent network. A worker-worker communication would be shorter, definitely, but may also involve more technology. We can do worker-to-worker direct, but I wanted to make sure that this was necessary.
Channels currently coordinate metadata through the scheduler. They work a bit like this:
So there are a few network hops here, although each should be in the millisecond range (I think?).
We could also set up a proper parameter server structure with single hop communicatinos. Building these things isn't hard. As usual my goal is to extract from this experiment something slightly more general to see if we can hit a broader use case.
So I guess my questions become:
The PS idea is very simple at the high level. The "parameter server" can be thought of as a "distributed key-value store". There could be 1 or more PS nodes (the idea is precisely to allow scaling the size of model parameters across multiple nodes, such as in the case of factorization machines, neural networks etc).
A good reference paper is https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf
So in theory, at the start of an iteration, a worker node asks the PS for only the parameters it needs to compute its update (in sparse data situations, this might only be a few % of the overall # features, per "partition" or "batch"). This can be thought of as a set of (key, value) pairs where the keys are vector indices and the values are vector values at the corresponding index, of the parameter vector. In practice, each PS node will hold a "slice" of the parameter vector (the paper uses a chord key layout for example), and will work with vectors rather than raw key-value pairs, for greater efficiency.
It seems like Channels might be a decent way to go about this. Yes, there is some network comm overhead but in practice for a large scale problem, the time to actually send the data (parameters and gradients say) would dominate the few ms of network hops. This cost could also be partly hidden through async operations.
The way I thought about it with Channels, which you touch on is:
To answer your specific questions:
So here is some code just to get things started off:
def parameter_server():
beta = np.zeros(1000000)
with worker_client() as c:
betas = c.channel('betas', maxlen=1)
future_beta = c.scatter(beta)
betas.append(future_beta)
updates = c.channel('updates')
for update in updates:
beta = modify(beta, update)
future_beta = c.scatter(beta)
betas.append(future_beta)
def worker(idx, x):
with worker_client(separate_thread=False) as c:
betas = c.channel('betas', maxlen=1)
last_beta = betas.data[-1]
subset_beta = c.submit(operator.getitem, last_beta, idx).result()
params = subset_beta.result()
update = create_update(x, params)
updates = c.channel('updates')
updates.append(update)
updates.flush()
For what it's worth I expect this code to fail in some way. I think that channels will probably have to be slightly modified somehow. For example currently we're going to record all of the updates that have been sent to the updates channel. We need to have some way of stating that a reference is no longer needed. Channels need some mechanism to consume and destroy references to futures safely.
What is the best way to handle intercepts?
Right now, the algorithms assume the user creates a column of 1
s in their dask array, à la statsmodels
. However, sometimes it's convenient to have a fit_intercept
option similar to scikit-learn
. Having this option set to True
will require a step which appends a column of 1's to the user-supplied dask array, but it won't be as simple as the corresponding numpy
case.
After running either pip install dask-glm
or conda install dask-glm
the dask-glm-bench notebook fails to import the line from dask_glm.algorithms import proximal_grad, admm
with a ContextualVersionConflict error:
ContextualVersionConflict Traceback (most recent call last)
<ipython-input-11-ccb1387c99f5> in <module>()
2 import dask.array as da
3 import numpy as np
----> 4 from dask_glm.algorithms import proximal_grad, admm
5 from dask_glm.utils import make_y
/opt/anaconda/lib/python3.5/site-packages/dask_glm/__init__.py in <module>()
1 from pkg_resources import get_distribution, DistributionNotFound
2 try:
----> 3 __version__ = get_distribution(__name__).version
4 except DistributionNotFound:
5 pass
/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in get_distribution(dist)
555 dist = Requirement.parse(dist)
556 if isinstance(dist, Requirement):
--> 557 dist = get_provider(dist)
558 if not isinstance(dist, Distribution):
559 raise TypeError("Expected string, Requirement, or Distribution", dist)
/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in get_provider(moduleOrReq)
429 """Return an IResourceProvider for the named module or requirement"""
430 if isinstance(moduleOrReq, Requirement):
--> 431 return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0]
432 try:
433 module = sys.modules[moduleOrReq]
/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in require(self, *requirements)
966 included, even if they were already activated in this working set.
967 """
--> 968 needed = self.resolve(parse_requirements(requirements))
969
970 for dist in needed:
/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in resolve(self, requirements, env, installer, replace_conflicting)
857 # Oops, the "best" so far conflicts with a dependency
858 dependent_req = required_by[req]
--> 859 raise VersionConflict(dist, req).with_context(dependent_req)
860
861 # push the new requirements onto the stack
ContextualVersionConflict: (multipledispatch 0.4.8 (/opt/anaconda/lib/python3.5/site-packages), Requirement.parse('multipledispatch>=0.4.9'), {'dask-glm'})
Both pip freeze
and conda list
show that multipledispatch version 0.4.9 is installed satisfying the dask-glm requirement, so I am not sure what to do here.
For the past few weeks, I've been working on issues related to NEP-18 support for Dask (and ultimately, Dask-GLM as well) to allow CuPy to be used as a backend. I've made some progress, which I'll soon share with more details.
Among the issues I have now is the usage of the following Dask-GLM algorithms with CuPy:
newton
: requires numpy.linalg.lstsqadmm
and lbfgs
: require scipy.optimize.fmin_l_bfgs_bBoth of these functions are not implemented in CuPy. So I'm wondering if anybody knows whether any of these functions have some open and reliable CUDA implementation that could be integrated into CuPy or if we could implement them somewhat easily on top of already supported CuPy functionality?
This is a question for @moody-marlin and @mcg1969
I expect that many large datasets will be highly repetitive. That is that I expect a sample of the data to be decently representative of the full dataset. Given this structure, it feels inefficient for our iterative algorithms to go over the entire dataset before updating parameters.
Are there situations in which it is advantageous to cycle through different subsets of the full dataset?
As a naive (probably wrong) example perhaps we would partition the data into ten randomly selected subsets, and then perform a single round of gradient descent on each in turn to obtain a new gradient direction.
Probably not the most orthodox thing to put in a GitHub issue, but it seems like it could be helpful for this project.
In the latest SIAM Review a paper by Fercoq and Richtárik appears: Optimization in High Dimensions vis Accelerated, Parallel, Coordinate Descent. I've got a paper copy, and I know the second author, and can certainly get an electronic copy if interested. Here is a preprint.
I can vouch for these folks, they've been working for years to parallelize some of the very optimization problems we're aiming to tackle here.
An all-too-often ignored side of optimization is the initialization; there is a lot of research out there suggesting that for both convex (and even more so for non-convex) optimization problems, a large amount of work can be saved by initializing algorithms at clever starting values.
Currently we are initializing all algorithms with the 0 vector. Once the API (#11) is sorted out, we should have multiple options for how to initialize, including (but not limited to):
refit
method, to be raised in a future issue)cc: @mpancia
@TomAugspurger has mentioned having a first release soon; I'd like to collect the things that we think are blocking that from happening at this moment and check them off the list (any of these left un-checked are open for anyone to work on):
p
-values for non-regularized problems)l1
regularization, don't do newton
)kwarg
to all algorithmsAnything else that I'm missing that we should have before a release?
cc: @hussainsultan
As promised in #73, I'd like to share here some sample code I've been working on, and the results I've obtained. Please note, unfortunately this is not reproducible elsewhere yet, this depends on some work in progress branches I have not just in Dask-GLM, but also Dask, CuPy and NumPy, so unfortunately, it will take at least a few more weeks until this is publicly reproducible. We're actively working with all projects I've mentioned before to have all these fixes and features integrated as soon as possible.
import cupy
import dask_glm.estimators
# x from 0 to 300
x = 300 * cupy.random.random((10000, 1))
# y = a*x + b with noise
y = 0.5 * x + 1.0 + cupy.random.normal(size=x.shape)
# create a linear regression model
est = dask_glm.estimators.LinearRegression(fit_intercept=False, solver='proximal_grad')
est.fit(x, y)
Here's the timing output of the last line alone:
CPU times: user 357 ms, sys: 99.2 ms, total: 456 ms
Wall time: 468 ms
Using gradient_descent
as solver:
CPU times: user 985 ms, sys: 417 ms, total: 1.4 s
Wall time: 1.4 s
And using lbfgs
solver with NumPy (not CuPy!), which was the fastest solver using NumPy as backend:
CPU times: user 3.53 ms, sys: 311 µs, total: 3.84 ms
Wall time: 2.35 ms
And finally, using sklearn.linear_model.LinearRegression with all default arguments, except of fit_intercept
(set to False
, just like in the previous examples):
CPU times: user 2.31 ms, sys: 333 µs, total: 2.64 ms
Wall time: 1.59 ms
I believe there may be differences in the default algorithm or default parameters used by default in sklearn. Does anyone know if that's the case and how could we have more of an apples to apples comparison?
It's worth mentioning, I intentionally used fit_intercept=False
because setting it to True
is still depending on fixing existing issues in Dask. I didn't present output for other Dask-GLM solvers using the CuPy backend because they are still not working, see #73.
It appears this is currently unmaintained. If that is the case, should we go ahead and archive this repo?
Spotted on dask/dask-cloudprovider#244 (comment). Read The Docs was still building latest
from the master
branch and builds were failing. It was configured to build from the default branch, but had cached the value master
somewhere. Manually pointing it to main
in the RtD control panel fixed things.
In some cases we may wish to rechunk our data prior to execution. This can help to balance between high scheduling overheads (too many tasks) and poor load balancing (too few tasks).
It appears that different algorithms have different optimum sizes. For example algorithms with low task counts like ADMM benefit from smaller chunksizes while algorithms with many small tasks like gradient/proximal descent benefit from larger chunksizes.
If I slightly modify the inner local_update
solves in the current ADMM implementation as follows:
def local_update(y, X, w, z, u, rho, fprime=proximal_logistic_gradient,
f=proximal_logistic_loss,
solver=fmin_l_bfgs_b):
w = w.ravel()
u = u.ravel()
z = z.ravel()
solver_args = (X, y, z, u, rho)
w, f, d = solver(f, w, fprime=fprime, args=solver_args, pgtol=1e-10,
maxiter=200,
maxfun=250, factr=1e-30)
if d['warnflag']:
raise ValueError(
'''Internal LBFGSB algorithm failed to converge!
Details: {}'''.format(d['task']))
else:
return w.reshape(2, 1)
Then on random data I see convergence issues:
from dask_glm.utils import make_y
X = da.random.random((100,2), chunks=(100/5, 2))
y = make_y(X)
logistic_regression(X, y[:,None], 0.1, 0.1, 1)
With the following error:
Traceback (most recent call last):
...
dask.async.ValueError: Internal LBFGSB algorithm failed to converge!
Details: b'ABNORMAL_TERMINATION_IN_LNSRCH'
which refers to abnormal termination in the line search (always with the line search...).
From the fmin_l_bfgs_b
output:
Line search cannot locate an adequate point after 20 function
and gradient evaluations. Previous x, f and g restored.
Possible causes: 1 error in function or gradient evaluation;
2 rounding error dominate computation.
The environment.yml
doesn't match CI. For example, in environment.yml
python=3.5.2
is required but in CI, python=3.7
is used.
Describe the issue:
Version 0.3.0 includes changes that break the compatibility of some functions - notably the families - to work with non-dask data.
As one example, the newton function changed the line calculating betas from:
beta = np.zeros_like(X, shape=p)
to:
beta = np.zeros_like(X._meta, shape=p)
which will fail for any non-dask data.
Minimal Complete Verifiable Example:
from dask_glm.algorithms import newton
import numpy as np
X = np.random.random((100, 3))
y = np.random.random((100,))
newton(X, y)
Anything else we need to know?: If this is an intended change, would be good to call out the reason for it (sorry if I missed it).
Environment: Local (macbook pro - M1)
2023.3.0
After a bit of profiling, this is what I found out for Dask-GLM with Dask array:
14339 0.139 0.000 0.814 0.000 /home/pentschev/.local/lib/python3.5/site-packages/dask/local.py:430(fire_task)
44898 19.945 0.000 19.945 0.000 {method 'acquire' of '_thread.lock' objects}
4055 0.042 0.000 19.992 0.005 /usr/lib/python3.5/threading.py:261(wait)
14339 0.107 0.000 20.234 0.001 /usr/lib/python3.5/queue.py:147(get)
14339 0.018 0.000 20.253 0.001 /home/pentschev/.local/lib/python3.5/site-packages/dask/local.py:140(queue_get)
122 0.117 0.001 22.327 0.183 /home/pentschev/.local/lib/python3.5/site-packages/dask/local.py:345(get_async)
122 0.013 0.000 22.346 0.183 /home/pentschev/.local/lib/python3.5/site-packages/dask/threaded.py:33(get)
122 0.004 0.000 22.733 0.186 /home/pentschev/.local/lib/python3.5/site-packages/dask/base.py:345(compute)
1 0.020 0.020 23.224 23.224 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/algorithms.py:200(admm)
1 0.000 0.000 23.267 23.267 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/utils.py:13(normalize_inputs)
1 0.000 0.000 23.268 23.268 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/estimators.py:65(fit)
A big portion of the time seems to be spent on waiting for thread lock. Also, looking at the callers, we see 100 compute()
calls departing from admm()
, which means it's not converging and stopping only at max_iter
as @cicdw suggested:
/home/pentschev/.local/lib/python3.5/site-packages/dask/base.py:345(compute) <- 100 0.004 19.637 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/algorithms.py:197(admm)
Running with NumPy, the algorithm converges, showing only 7 compute()
calls:
/home/pentschev/.local/lib/python3.5/site-packages/dask/base.py:345(compute) <- 7 0.000 0.120 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/algorithms.py:197(admm)
I'm running Dask 1.1.4 and Dask-GLM master branch, to ensure that my local changes aren't introduce any bugs. However, if I run my Dask-GLM branch and use CuPy as a backend, it also converges in 7 iterations.
To me this seems to suggest that we have one of those very well-hidden and difficult to track bugs in Dask. Before I spent hours with this, any suggestions what could we look for?
Originally posted by @pentschev in dask/dask-blog#15
Note also that Dask-GLM estimators were deprecated in #66 in favor of Dask-ML. If this is truly a bug in Dask-GLM, it may have been fixed in Dask-ML already.
For running dask-glm
in internal (enterprise) environment, i run into scenarios where a conda-forge package is much easier to mirror internally than straight from the git (external) repo.
What happened:
When I set solver=lbfgs
, I get a warning:
/Users/scott/anaconda3/lib/python3.8/site-packages/dask/config.py:588: UserWarning: Configuration key "fuse_ave_width" has been deprecated. Please use "optimization.fuse.ave-width" instead
What you expected to happen:
I did not expect this warning.
Minimal Complete Verifiable Example:
from sklearn.datasets import make_classification
from dask_ml.linear_model import LogisticRegression
X, y = make_classification()
est = LogisticRegression(solver="lbfgs")
est.fit(X, y)
Environment:
I tried out dask-glm on a subset of the criteo data here:
https://gist.github.com/mrocklin/1a1c0b011e187a750a050eb330ac36b2
This used the following:
I suspect that there is still a fair amount to do here to optimize performance and quality of the model
cc @moody-marlin @TomAugspurger @MLnick
If we use define l2
regularization as:
min_x ( f(x) + lambda / 2 \|x\|_2^2 )
then the regularizer is given by
1/2 \|x\|_2^2
which has proximal operator 1 / (1 +t) x
(which is what we currently have implemented). However, all of our other methods assume the regularizer is given by \|x\|_2^2
(without the 1/2 scaling).
This doesn't affect quality, but we should be consistent.
cc @postelrich
It'd be good to clarify the boundaries of dask-glm and dask-ml. My motivation is building up a set of utilities in dask-ml for working generically with dask or NumPy arrays, and dask or pandas dataframes.
I'd like to move the estimator interface over to dask-ml
and leave all the optimization and regularization logic here. I think that families
would stay in dask_glm.
With that reorganization, the interface is:
We should have a testing framework / experimentation engine to test the algorithms for robustness to various types of input data (normalized / not-normalized / bad condition numbers / partial separability / low class sizes, etc.) -- eventually we might even want some sort of switching method that, for example, might detect bad conditioning in Newton's method and switch to gradient descent. This will require first writing a way to generate such data.
So far in https://github.com/dask/dask-glm/blob/master/dask_glm/families.py we have families for linear/normal and logistic that look like the following:
class Logistic(object):
@staticmethod
def loglike(Xbeta, y):
eXbeta = exp(Xbeta)
return (log1p(eXbeta)).sum() - dot(y, Xbeta)
@staticmethod
def pointwise_loss(beta, X, y):
'''Logistic Loss, evaluated point-wise.'''
beta, y = beta.ravel(), y.ravel()
Xbeta = X.dot(beta)
return Logistic.loglike(Xbeta, y)
@staticmethod
def pointwise_gradient(beta, X, y):
'''Logistic gradient, evaluated point-wise.'''
beta, y = beta.ravel(), y.ravel()
Xbeta = X.dot(beta)
return Logistic.gradient(Xbeta, X, y)
@staticmethod
def gradient(Xbeta, X, y):
p = sigmoid(Xbeta)
return dot(X.T, p - y)
@staticmethod
def hessian(Xbeta, X):
p = sigmoid(Xbeta)
return dot(p * (1 - p) * X.T, X)
@jrbourbeau and I are in the process of moving the default branch for this repo from master to main.
Once the name on github is changed (the first box above is Xed, or this issue closed), when you try to git pull
you'll get
Your configuration specifies to merge with the ref 'refs/heads/master'
from the remote, but no such ref was fetched.
First: head to your fork and rename the default branch there
Then:
git branch -m master main
git fetch origin
git branch -u origin/main main
Hi,
In my current project, my team needs to use RidgeRegression with an SVD solver.
I implemented an initial version here, based on the sklearn implementation.
I also made an draft version for a RidgeRegressor to require an alpha
parameters. However, I saw #102 suggesting to move the estimator to https://github.com/dask/dask-ml/blob/main/dask_ml/linear_model/glm.py.
If there's interest in having the solver and estimator integrated into this project, I would be happy to work on it.
@moody-marlin pointed out that we need to add an environment.yml file for ease of use with conda.
Caused by: org.apache.spark.SparkException: Process List(/databricks/python/bin/pip, install, dask-glm==0.2.0, --disable-pip-version-check) exited with code 1. mlflow 1.5.0 requires alembic, which is not installed.
While using Databricks Installation breaks.
Dask-glm is using mlflow 1.5.0 ?
Can you upgrade?
Scikit-learn estimators have a n_iter_
parameter. Dask-GLM doesn't.
For example, the Scikit-Learn logistic regression implementation has the n_iter_
parameter (docs). However, the Dask-GLM implementation doesn't (docs).
This issue presented itself in dask/dask-blog#15 (comment)
One of the biggest hanging issues is to organize / discuss what the API will look like for dask_glm
and what sorts of things we expose. This decision impacts algorithm structure going forward (as we incorporate other GLM families and regularizers) so I will need some help focusing the following discussion on specific action items. My initial thoughts were to:
Aim for generality so users can create their own GLM families / regularizers: My initial thought along these lines were to have an Optimizer(**initialization_args, **numerical_algo_args, **regularization_args)
class that can be inherited by any class that implements func(), grad(), hessian()
methods. This would allow us to focus on algorithm development separately from model specification and add additional model types (Normal, Poisson, etc.) easily. The user then calls Logistic(X,y, **kwargs)
to initialize a model, and then .fit(**algo_args)
calls on the methods from Optimizer
. This can get hairy, and could possibly prevent further optimizations that exploit specific structure inherent in the models.
Hard code the models users can choose from: Per my conversation with @jcrist, we might not want to aim for extreme flexibility in the model API, as most people probably don't need / want to define their own GLM families or their own regularizers. Related, we might want to implement the algorithms specifically for each family / regularizer.
Regularized vs. unregularized models: How do we manage the 'switch' from regularized to unregularized models? Should it be something as simple as model = LogisticModel(X, y, reg='l2')
and the user chooses from a pre-defined list of regularizers, or should we allow for model = LogisticModel(X, y, reg=reg_func)
where reg_func()
is any regularization function on the parameters, allowing for customizations such as grouped lasso, non-negativity, etc? Again I was hoping to keep things as general as possible so that the user can mix and match, but on the algorithm side this can get difficult especially if we want speed and accuracy.
-Inherit from scikit-learn: Per my conversation with @hussainsultan, we might want to try and stick as closely as possible to scikit-learn
's API, as this is common enough to be comfortable for most people. However, we don't want to adhere to this too strongly (especially in output), because we started with the goal of intelligently toggling between inferential and predictive applications (thus requiring things like inferential stats that scikit-learn
currently doesn't include).
Ultimately, I think my thought process boils down to: should we expose an API to access the underlying algorithms or should we bake them into the various specific GLM families we want to consider?
To allow for easy testing of CuPy and Dask during this phase, a copy of the existing relevant Dask tests was created under #75. This incurs in a lot of duplication that can later be merged into a single implementation to test both backends. The same happens for the datasets module, which was duplicated for the time being.
People have submitted PRs to three forks/branches recently:
It would be good to merge these all to some fork/branch quickly so that we don't need to go through expensive merging later on. I don't particulaly care which fork/branch, but I would prefer to see things drive towards a common point. It becomes expensive to merge if things diverge for too long.
cc @moody-marlin @hussainsultan
(edited to not require the new API)
Probably other algorithms too. I see that proximal_grad does handle it correctly.
This little script compares the fit for scikit-learn LogisticRegression and our admm against a baseline of no-overflow, to a copy of the dataset with n
values in a range that will overflow when passed through the sigmoid function.
import warnings
import dask.array as da
import numpy as np
from scipy.optimize import fmin_l_bfgs_b
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from dask_glm.algorithms import admm
warnings.simplefilter("ignore", RuntimeWarning)
C = 10
lambda_ = 1 / C
np.random.seed(2)
X, y = datasets.make_classification(n_classes=2, n_samples=1000)
Xda = da.from_array(X, (10, X.shape[1]))
yda = da.from_array(y, (10,))
lrbase = LogisticRegression(fit_intercept=False, C=C, max_iter=10).fit(X, y)
admm_base = admm(Xda, yda, lamduh=lambda_, max_iter=10)
def fit_and_compare(X, y, n):
X = X.copy()
np.random.seed(2)
idx = np.random.choice(np.arange(len(X)), size=(n,))
X[idx, 1] = np.random.randint(750, 1000, size=(n,))
lr = LogisticRegression(fit_intercept=False, C=C, max_iter=10)
lr.fit(X, y)
Xda = da.from_array(X, (10, X.shape[1]))
yda = da.from_array(y, (10,))
result = admm(Xda, yda, lamduh=lambda_, max_iter=10)
print(f"Overflow={n}")
print("Sklearn:", np.mean(np.abs(lrbase.coef_ - lr.coef_)))
print("admm :", np.mean(np.abs(admm_base - result)))
return lr.coef_, result
fit_and_compare(X, y, 0)
fit_and_compare(X, y, 5)
fit_and_compare(X, y, 10)
fit_and_compare(X, y, 50)
fit_and_compare(X, y, 100)
Outputs
Overflow=0
Sklearn: 6.21681525703e-17
admm : 1.04008099876e-13
Overflow=5
Sklearn: 0.00377521192273
admm : 0.0975875697572
Overflow=10
Sklearn: 0.00311021598129
admm : 0.118998783966
Overflow=50
Sklearn: 0.00055956264663
admm : 0.178748360839
Overflow=100
Sklearn: 0.00312859463805
admm : 0.196356993923
Hello!
I've made some changes to the ENet prox operator documentation, and I'd like to submit a pull request and have it looked at.
I've put it in my own branch on my machine - but when I try to make a PR I get told that I'm not authenticated to do that.
More generally, I've got time until the new year to write some more documentation or I could re implement some of my PhD work on fully distributed ADMM (but I'd need help!).
Tom
Currently, the implementation of bfgs
isn't always converging, and the implementation is messy. Should we spend some time working on this, or toss it? If we're going to keep it, what exactly is going on with the implementation?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.