Giter Site home page Giter Site logo

dask-glm's Introduction

Dask

Build Status Coverage status Documentation Status Discuss Dask-related things and ask for help Version Status NumFOCUS

Dask is a flexible parallel computing library for analytics. See documentation for more information.

LICENSE

New BSD. See License File.

dask-glm's People

Contributors

agramfort avatar cicdw avatar daxiongshu avatar fjetter avatar hussainsultan avatar inati avatar jacobtomlinson avatar jdlesage avatar jrbourbeau avatar milesgranger avatar mlnick avatar mpancia avatar mrocklin avatar postelrich avatar thomasjpfan avatar tomaugspurger avatar zdgriffith avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-glm's Issues

Compute_stepsize function in proximal_grad

Currently the stepsize computation in proximal_grad is still embedded within the code, which makes it hard to manage it with Dask. If the proximal_grad method is useful then it would be nice to pull this function out.

cc @moody-marlin

'Array' object has no attribute '_check_array' error when fitting Linear Regression

Code:

from dask_ml.linear_model import LinearRegression as DLR
from dask_ml.datasets import make_regression

X, y = make_regression(n_samples = 500_000, n_features = 60, chunks=10000)
dask_model = DLR(fit_intercept=False)

DLR.fit(X, y)
anaconda3/envs/learn-new-pkgs/lib/python3.7/site-packages/dask_ml/linear_model/glm.py in fit(self, X, y)
    181         self : objectj
    182         """
--> 183         X = self._check_array(X)
    184 
    185         solver_kwargs = self._get_solver_kwargs()

AttributeError: 'Array' object has no attribute '_check_array'

package versions

dask                      2.6.0                      py_0    conda-forge
dask-core                 2.6.0                      py_0    conda-forge
dask-glm                  0.2.0                      py_1    conda-forge
dask-jobqueue             0.6.3                      py_0    conda-forge
dask-ml                   1.0.0                      py_1    conda-forge

distributed and/or asynchronous algorithms for numeric methods

Hi all,

I am a second year PhD student in mechanical engineering at UC Davis studying numeric methods for optimal control. I just learned about dask and I'm excited about using it for the types of problems I do in my research. I think it might already, or be very close to, be a really great way to increase performance for common numeric methods in scientific computing.

In general, it seems that if an implementation of a method that maximizes the number of "sequential" computations between control flow structures and using concurrent futures for control flows (like error checks) would get significant speed boosts since the dask scheduler optimizes and distributes the computational graph. For example, if one were to daskify an algorithm like a Runge-Kutta ODE solver and all the derivative callbacks, the scheduler should automatically perform the equivalent of common sub-expression collection and the parallelization of inter-step computations. Then dask-based higher level logic (such as comparing the results of multiple simulations) or algorithms (like shooting methods for solving boundary value problems) could add the lower-level computation tasks to the scheduler so computation could be processed in parallel without the user having to choose one level to parallelize at.

This leads to some questions:

  • Does it seem like I’m understanding how to structure this type of work with dask correctly?
  • Would there be interest in daskifying numeric algorithms like Runge-Kutta ODE solvers or “small” optimization methods? And/or implementing inherently distributed and/or asynchronous algorithms (I see several issues related to ADMM for convex optimization)?
  • I also see an issue for chunked linear algebra methods; is there any interest in improving performance for dense (small/medium sized) linear algebra for single workers (i.e., calling the best BLAS/LAPACK calls), perhaps through some data modeling? A combination of the chunked and optimized BLAS/LAPACK calls seems very compelling to me, but I havne't looked at this very thoroughly yet.

Thanks,
Ben

Related issues:
dask/dask#2225
#3
#5
#57

P.S. I'm posting in this repo because more of the related issues that I could find were here; I'd be happy to close this and re-post in another repository if it would be more appropriate. Just let me know.

Approximate exponents

Fun fact: the current ADMM implementation can spend almost half of it's time computing np.exp

Script

from dask import persist
import dask.array as da
import numpy as np
from dask_glm.logistic import admm
from dask_glm.utils import make_y

N = 1e7
M = 2
chunks = 1e6
seed = 20009

X = da.random.random((N, M), chunks=(chunks, M))
y = make_y(X, beta=np.array(list(range(M))), chunks=chunks)

X, y = persist(X, y)

%%prun
import dask
with dask.set_options(get=dask.get):
    beta = admm(X, y)

Profile results

I use a wrapped version of np.exp just so that it shows up in profile results.

         565331 function calls (532847 primitive calls) in 93.202 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     2254   41.543    0.018   41.543    0.018 utils.py:25(exp)
     1127   30.614    0.027   52.086    0.046 logistic.py:349(logistic_loss)
     1127   13.122    0.012   40.358    0.036 logistic.py:364(logistic_gradient)
     1127    6.267    0.006   27.191    0.024 logistic.py:343(sigmoid)
     1130    0.793    0.001    0.793    0.001 {method 'reduce' of 'numpy.ufunc' objects}
        8    0.078    0.010    0.079    0.010 executionengine.py:100(finalize_object)
     1127    0.052    0.000   40.409    0.036 logistic.py:373(proximal_logistic_gradient)
     1127    0.049    0.000   52.135    0.046 logistic.py:358(proximal_logistic_loss)
        8    0.043    0.005    0.043    0.005 passmanagers.py:94(run)
       30    0.034    0.001   93.040    3.101 lbfgsb.py:205(_minimize_lbfgsb)
     2254    0.031    0.000   41.582    0.018 dispatcher.py:152(__call__)
     1127    0.029    0.000   92.998    0.083 lbfgsb.py:277(func_and_grad)
26957/12971    0.027    0.000    0.083    0.000 {method 'format' of 'str' objects}
     1127    0.021    0.000   52.417    0.047 optimize.py:290(function_wrapper)
    80776    0.019    0.000    0.030    0.000 {built-in method isinstance}
     1127    0.013    0.000    0.814    0.001 fromnumeric.py:1743(sum)

Comments

Here are some comments from @stuartarchibald

How many exp() are you doing at a time?
If you are doing many, are the input values of similar magnitude?
Do you care about IEEE754 correctness over NaN and Inf?

When I've deal with this previously, most of the time spent in a loop over exp() was in a) feraiseexcept or similar dealing with inf/nan b) branch misprediction, the split points are usually f(log_e(2)) IIRC.

a) goes away if you don't care
b) goes away if you can guarantee a range or aren't too bothered about inaccuracy out of range. Remez algs are usually used to compute the coefficient table for a polynomial and a bunch of shifts are done to get values into appropriate range.
c) if you have a load of values to compute, perhaps try Intel VML?

cc @seibert @mcg1969

Logistic Accuracy tests

We need to determine a robust testing framework for our logistic regression algorithms, starting with a review of the accuracy of the current ADMM code.

Unregularized Problems

  • For unregularized problems with an intercept, we can start with the high-level test
    y.sum() = sigmoid(X.dot(beta)).sum()
  • For unregularized problems without an intercept, we can create a handful of tiny datasets to test on, or use well-studied public datasets (e.g., iris)

Regularized Problems

  • As @hussainsultan already did, we can start with some high level 'marginal' tests which test extreme values for the various input parameters (e.g., when the regularization parameter is 'large' for l1-problems, the coefficients should all be 0)
  • For regularized problems, we might need to hand craft a few tiny datasets to test on for accuracy

BFGS Segmentation Fault

On my local machine, bfgs() is creating a segmentation fault anytime I try to run it.

import dask.array as da
from dask_glm.utils import make_y
from dask_glm.logistic import bfgs

X = da.random.random((100,2), chunks=(20,2))
y = make_y(X)
bfgs(X, y)

yields:

//anaconda/lib/python3.5/site-packages/dask/array/core.py:464: RuntimeWarning: overflow encountered in true_divide
  o = func(*args, **kwargs)
Segmentation fault: 11

Possibly related to dask/dask#1927 ?

Profile results

On eight m4.2xlarges I created the following dataset

N = 1e8
beta = np.array([-1, 0, 1, 2])
M = 4
chunks = 1e6
seed = 20009

X = da.random.random((N, M), chunks=(chunks, M))
z0 = X.dot(beta)
y = da.random.random(z0.shape, chunks=z0.chunks) < sigmoid(z0)

X, y = persist(X, y)

I then ran the various methods within this project and recorded the profiles as bokeh plots. They are linked to below:

Additionally, I ran against a 10x larger dataset and got the following results

Most runtimes were around a minute. The BFGS solution gave wrong results.

Notes

On larger problems with smallish chunks (8 * 4 * 1e6 == 24 MB) we seem to be bound by scheduling overhead. I've created an isolated benchmark here that is representative of this overhead: https://gist.github.com/mrocklin/48b7c4b610db63b2ee816bd387b5a328

Add default arguments to ADMM code

I started trying to play with @hussainsultan 's ADMM code. However, it requires many parameters for which I do not know good values. Can we add sane defaults there? Do we know sane defaults?

Short term it would be useful to me to always be able to do the following for all solver routines:

result = func(X, y)

Migrate CI to GitHub Actions

Due to changes in the Travis CI billing, the Dask org is migrating CI to GitHub Actions.

This repo contains a .travis.yml file which needs to be replaced with an equivalent .github/workflows/ci.yml file.

See dask/community#107 for more details.

ADMM shape changes

A few shape issues that arose in #2 :

  • Change the ADMM implementation to accept y arrays of shape (X.shape[0], ), possibly by simply adding the line y = y[:, None] before restructuring too much of the code.
  • Change the ADMM implementation to accept X, y arrays that are not split into 5 chunks, but an arbitrary number of chunks / arbitrary chunk distribution.

@hussainsultan

Asynchronous solvers

So far all of our solvers are synchronous. They compute full results in lock-step, for example switching between performing a parallel mat-vec, then doing a line search, and then doing a mat-vec again. These algorithms are common on single-machine hardware but may not be ideal for larger clusters.

The distributed scheduler provides some decent capabilities for full asynchronous computing, which may open us up to new algorithms. Are there asynchronous variants to some of these algorithms that may interest us?

Quick example of asynchronous code:

data_futures = client.map(load_chunk, chunks)
params = {...}

futures = client.map(compute_update, random.sample(100, data_futures), **params)

ac = as_completed(futures)  # collection of running futures that yield in order of completion
for future in ac:
    update_info, score = ac.result()
    if is_good(score):
        break
    update_params(params, update_info)
    new_future = client.submit(compute_update, random.choice(data_futures), **params)
    ac.add(new_future)

@hussainsultan @mcg1969 @jcrist @moody-marlin

See Also

Relax requirements

Currently requirements.txt specifies exact versions. This makes it very hard to use dask-glm with other libraries.

We should probably use >= modifiers instead and choose older versions of libraries if possible.

Parameter server

This is a summary of an e-mail between myself and @MLnick

From Nick

Taking the simplest version of say logistic regression, the basic idea is to split up the parameter vector (beta) itself into chunks (so it could be a dask array potentially, or some other distributed dask datastructure). Training would then iterate over "mini-batches" using SGD (let's say each mini batch is a chunk of the X array). In each mini batch, the worker will "pull" the latest version of "beta" from the Parameter Server and compute the (local) gradient for the batch. The worker then sends this gradient to the PS, which then performs the update (i.e. update its part of "beta" using say the gradient update from the worker and the step size). The next iteration then proceeds in the same way. This can be sync or async (but typically is either fully async or "bounded stale" async).

The key is to do this effectively as direct communication from the worker doing the mini batch gradient computation, to the worker holding the parameters (the "parameter server"), without involving the master ("client" app) at all, and to only "pull" and "push" the part of beta required for local computation (due to sparsity this doesn't need to be the full beta in many cases). In situations where the data is very sparse (e.g. like the Criteo data) the communication is substantially reduced in this approach. And the model size can be scaled up significantly (e.g. for FMs the model size can be very large).

This is slightly different from the way say L-BFGS works currently (and the way I seem to understand ADMM works in dask-glm) - i.e. that more or less a set of local computations are performed on the distributed data on the workers, and the results collected back to the "master", where an update step is performed (using LBFGS or the averaging of ADMM, respectively). This is also the way Spark does things.

What I'm struggling with is quite how to achieve the PS approach in dask. It seems possible to do it in a few different ways, e.g. perhaps it's possible just using simple distributed dask arrays, or perhaps using "worker_client" and/or Channels. The issue I have is how to let each worker "pull" the latest view of "beta" in each iteration, and how to have each worker "push" its local gradient out to update the "beta" view, without the "master" being involved.

I'm looking into the async work in http://matthewrocklin.com/blog/work/2017/04/19/dask-glm-2 also to see if I can do something similar here.

From me

First, there are two nodes that you might consider the "master", the scheduler and the client. This is somewhat of a deviation from Spark, where they are both in the same spot.

Second, what are your communication and computation requirements? A roundtrip from the client to scheduler to worker to scheduler to client takes around 10ms on a decent network. A worker-worker communication would be shorter, definitely, but may also involve more technology. We can do worker-to-worker direct, but I wanted to make sure that this was necessary.

Channels currently coordinate metadata through the scheduler. They work a bit like this:

  1. Worker A subscribes to channel, tells scheduler
  2. Worker B subscribes to channel, tells scheduler
  3. Worker A creates some data and registers it on the channel, tells the scheduler
  4. Scheduler tells all workers that are on this channel (A and B) that a new piece of data is on the channel
  5. Worker B says great, I want this data, and asks the scheduler where it can get it
  6. Scheduler tells Worker B that the data is on Worker A
  7. Worker B gets data from Worker A

So there are a few network hops here, although each should be in the millisecond range (I think?).

We could also set up a proper parameter server structure with single hop communicatinos. Building these things isn't hard. As usual my goal is to extract from this experiment something slightly more general to see if we can hit a broader use case.

So I guess my questions become:

  1. What are your communication requirements
  2. How much data are you likely to shove through this
  3. Are you likely to have multiple parameter servers? If so how would you anticipate sharding communication?

From Nick

The PS idea is very simple at the high level. The "parameter server" can be thought of as a "distributed key-value store". There could be 1 or more PS nodes (the idea is precisely to allow scaling the size of model parameters across multiple nodes, such as in the case of factorization machines, neural networks etc).

A good reference paper is https://www.cs.cmu.edu/~muli/file/parameter_server_osdi14.pdf

So in theory, at the start of an iteration, a worker node asks the PS for only the parameters it needs to compute its update (in sparse data situations, this might only be a few % of the overall # features, per "partition" or "batch"). This can be thought of as a set of (key, value) pairs where the keys are vector indices and the values are vector values at the corresponding index, of the parameter vector. In practice, each PS node will hold a "slice" of the parameter vector (the paper uses a chord key layout for example), and will work with vectors rather than raw key-value pairs, for greater efficiency.

It seems like Channels might be a decent way to go about this. Yes, there is some network comm overhead but in practice for a large scale problem, the time to actually send the data (parameters and gradients say) would dominate the few ms of network hops. This cost could also be partly hidden through async operations.

The way I thought about it with Channels, which you touch on is:

  1. Let's say we have 1x PS worker for simplicity, and some other "compute" workers. The "compute" workers will hold the chunks of data (X, y blocks). The PS will hold "beta".
  2. PS creates an initial beta vector (random data, zeros, whatever). It could "publish" this vector (future?) on the Channel "params", saying "here is the latest version of beta".
  3. Workers start iteration 1, and pull the new "beta" (future?) off the channel. Let's say Worker A perhaps needs 10% of the total vector - so it "pulls" beta[idx] from PS - where idx is the set of active feature indices it needs to compute it's gradient.
  4. Worker A computes its partial gradient for the chunk. It needs to "push" this grad[idx] (or alternatively, a "sparse vector" version of grad) to the PS. It could push this as another future into the channel? Or perhaps another channel? But would the idea be that PS gets this future off the channel, knows that Worker A holds the data it needs, and does something like beta[idx] -= grad[idx] * step_size (simplified update), where it will know to pull grad[idx] from Worker A? And then "publishes" the new "beta future" on the "params" channel?
  5. This all happens async - so effectively a "slow" worker may "miss" a few beta updates. Workers could always poll the head of the channel for the latest.
  6. The PS could in this way implement some form of "bounded synchronous" updates.

To answer your specific questions:

  1. As I mention above, of course we'd prefer to have lowest cost communication for the above scenario - but I would expect a few ms overhead from network hops to be marginal in terms of overall cost. I would tend to start with what is "built in" and see if it works well, before trying to build more custom stuff.
  2. Quite a lot - that is the idea, to scale to large models. By large I would say typically 100s millions - billions of parameters in total. Each mini-batch would not typically communicate that entire parameter space, but it could still be a few million parameters per mini batch.
  3. Yes - though even 1 PS can be useful in scaling. Sharding can range from simply splitting the param vector in contiguous chunks, to "key chord" layouts and other more involved architectures (mostly this is done for fault tolerance purposes).

From me

So here is some code just to get things started off:

def parameter_server():
    beta = np.zeros(1000000)
    with worker_client() as c:
        betas = c.channel('betas', maxlen=1)
        future_beta = c.scatter(beta)
        betas.append(future_beta)

        updates = c.channel('updates')
        for update in updates:
            beta = modify(beta, update)
            future_beta = c.scatter(beta)
            betas.append(future_beta)


def worker(idx, x):
    with worker_client(separate_thread=False) as c:
        betas = c.channel('betas', maxlen=1)
        last_beta = betas.data[-1]
        subset_beta = c.submit(operator.getitem, last_beta, idx).result()
        params = subset_beta.result()

        update = create_update(x, params)
        updates = c.channel('updates')
        updates.append(update)
        updates.flush()

For what it's worth I expect this code to fail in some way. I think that channels will probably have to be slightly modified somehow. For example currently we're going to record all of the updates that have been sent to the updates channel. We need to have some way of stating that a reference is no longer needed. Channels need some mechanism to consume and destroy references to futures safely.

How to handle intercept term?

What is the best way to handle intercepts?

Right now, the algorithms assume the user creates a column of 1s in their dask array, à la statsmodels. However, sometimes it's convenient to have a fit_intercept option similar to scikit-learn. Having this option set to True will require a step which appends a column of 1's to the user-supplied dask array, but it won't be as simple as the corresponding numpy case.

@mrocklin

Dask-glm throws ContextualVersionConflict

After running either pip install dask-glm or conda install dask-glm the dask-glm-bench notebook fails to import the line from dask_glm.algorithms import proximal_grad, admm with a ContextualVersionConflict error:

ContextualVersionConflict                 Traceback (most recent call last)
<ipython-input-11-ccb1387c99f5> in <module>()
      2 import dask.array as da
      3 import numpy as np
----> 4 from dask_glm.algorithms import proximal_grad, admm
      5 from dask_glm.utils import make_y

/opt/anaconda/lib/python3.5/site-packages/dask_glm/__init__.py in <module>()
      1 from pkg_resources import get_distribution, DistributionNotFound
      2 try:
----> 3     __version__ = get_distribution(__name__).version
      4 except DistributionNotFound:
      5     pass

/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in get_distribution(dist)
    555         dist = Requirement.parse(dist)
    556     if isinstance(dist, Requirement):
--> 557         dist = get_provider(dist)
    558     if not isinstance(dist, Distribution):
    559         raise TypeError("Expected string, Requirement, or Distribution", dist)

/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in get_provider(moduleOrReq)
    429     """Return an IResourceProvider for the named module or requirement"""
    430     if isinstance(moduleOrReq, Requirement):
--> 431         return working_set.find(moduleOrReq) or require(str(moduleOrReq))[0]
    432     try:
    433         module = sys.modules[moduleOrReq]

/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in require(self, *requirements)
    966         included, even if they were already activated in this working set.
    967         """
--> 968         needed = self.resolve(parse_requirements(requirements))
    969 
    970         for dist in needed:

/opt/anaconda/lib/python3.5/site-packages/setuptools-27.2.0-py3.5.egg/pkg_resources/__init__.py in resolve(self, requirements, env, installer, replace_conflicting)
    857                 # Oops, the "best" so far conflicts with a dependency
    858                 dependent_req = required_by[req]
--> 859                 raise VersionConflict(dist, req).with_context(dependent_req)
    860 
    861             # push the new requirements onto the stack

ContextualVersionConflict: (multipledispatch 0.4.8 (/opt/anaconda/lib/python3.5/site-packages), Requirement.parse('multipledispatch>=0.4.9'), {'dask-glm'})

Both pip freeze and conda list show that multipledispatch version 0.4.9 is installed satisfying the dask-glm requirement, so I am not sure what to do here.

NEP-18: CuPy backend requires unimplemented algorithms

For the past few weeks, I've been working on issues related to NEP-18 support for Dask (and ultimately, Dask-GLM as well) to allow CuPy to be used as a backend. I've made some progress, which I'll soon share with more details.

Among the issues I have now is the usage of the following Dask-GLM algorithms with CuPy:

  1. newton: requires numpy.linalg.lstsq
  2. admm and lbfgs: require scipy.optimize.fmin_l_bfgs_b

Both of these functions are not implemented in CuPy. So I'm wondering if anybody knows whether any of these functions have some open and reliable CUDA implementation that could be integrated into CuPy or if we could implement them somewhat easily on top of already supported CuPy functionality?

Question about convergence on repetitive data

This is a question for @moody-marlin and @mcg1969

I expect that many large datasets will be highly repetitive. That is that I expect a sample of the data to be decently representative of the full dataset. Given this structure, it feels inefficient for our iterative algorithms to go over the entire dataset before updating parameters.

Are there situations in which it is advantageous to cycle through different subsets of the full dataset?

As a naive (probably wrong) example perhaps we would partition the data into ten randomly selected subsets, and then perform a single round of gradient descent on each in turn to obtain a new gradient direction.

A new accelerated, parallel, proximal descent method

Probably not the most orthodox thing to put in a GitHub issue, but it seems like it could be helpful for this project.

In the latest SIAM Review a paper by Fercoq and Richtárik appears: Optimization in High Dimensions vis Accelerated, Parallel, Coordinate Descent. I've got a paper copy, and I know the second author, and can certainly get an electronic copy if interested. Here is a preprint.

I can vouch for these folks, they've been working for years to parallelize some of the very optimization problems we're aiming to tackle here.

Initialization Routines

An all-too-often ignored side of optimization is the initialization; there is a lot of research out there suggesting that for both convex (and even more so for non-convex) optimization problems, a large amount of work can be saved by initializing algorithms at clever starting values.

Currently we are initializing all algorithms with the 0 vector. Once the API (#11) is sorted out, we should have multiple options for how to initialize, including (but not limited to):

  • random Gaussian initializations
  • running some other, faster algorithm at a very low tolerance (ex: initialization Newton with the output of gradient descent set at a very low tolerance setting)
  • outputs of previous runs (will be built into a refit method, to be raised in a future issue)
  • more interesting but academically well-grounded ideas

cc: @mpancia

First Release Discussion

@TomAugspurger has mentioned having a first release soon; I'd like to collect the things that we think are blocking that from happening at this moment and check them off the list (any of these left un-checked are open for anyone to work on):

  • some minimal statistical output (e.g., p-values for non-regularized problems)
  • docs
  • some sort of algorithm selection criteria (nothing complicated or fancy, just, e.g., if l1 regularization, don't do newton)
  • add initialization kwarg to all algorithms

Anything else that I'm missing that we should have before a release?

cc: @hussainsultan

NEP-18: Performance issues using CuPy as backend

As promised in #73, I'd like to share here some sample code I've been working on, and the results I've obtained. Please note, unfortunately this is not reproducible elsewhere yet, this depends on some work in progress branches I have not just in Dask-GLM, but also Dask, CuPy and NumPy, so unfortunately, it will take at least a few more weeks until this is publicly reproducible. We're actively working with all projects I've mentioned before to have all these fixes and features integrated as soon as possible.

import cupy
import dask_glm.estimators

# x from 0 to 300
x = 300 * cupy.random.random((10000, 1))

# y = a*x + b with noise
y = 0.5 * x + 1.0 + cupy.random.normal(size=x.shape)

# create a linear regression model
est = dask_glm.estimators.LinearRegression(fit_intercept=False, solver='proximal_grad')

est.fit(x, y)

Here's the timing output of the last line alone:

CPU times: user 357 ms, sys: 99.2 ms, total: 456 ms
Wall time: 468 ms

Using gradient_descent as solver:

CPU times: user 985 ms, sys: 417 ms, total: 1.4 s
Wall time: 1.4 s

And using lbfgs solver with NumPy (not CuPy!), which was the fastest solver using NumPy as backend:

CPU times: user 3.53 ms, sys: 311 µs, total: 3.84 ms
Wall time: 2.35 ms

And finally, using sklearn.linear_model.LinearRegression with all default arguments, except of fit_intercept (set to False, just like in the previous examples):

CPU times: user 2.31 ms, sys: 333 µs, total: 2.64 ms
Wall time: 1.59 ms

I believe there may be differences in the default algorithm or default parameters used by default in sklearn. Does anyone know if that's the case and how could we have more of an apples to apples comparison?

It's worth mentioning, I intentionally used fit_intercept=False because setting it to True is still depending on fixing existing issues in Dask. I didn't present output for other Dask-GLM solvers using the CuPy backend because they are still not working, see #73.

Archive this repo?

It appears this is currently unmaintained. If that is the case, should we go ahead and archive this repo?

Optimal chunksizes

In some cases we may wish to rechunk our data prior to execution. This can help to balance between high scheduling overheads (too many tasks) and poor load balancing (too few tasks).

It appears that different algorithms have different optimum sizes. For example algorithms with low task counts like ADMM benefit from smaller chunksizes while algorithms with many small tasks like gradient/proximal descent benefit from larger chunksizes.

Abstract Base Classes

We should probably implement abstract base classes for Regularizer and Family with corresponding documentation; as #47 and #46 have highlighted, the lack of a solid template can lead to problems.

  • Regularizer Base Class
  • Family Base Class

ADMM: Inner L-BFGS optimizations fail to converge

If I slightly modify the inner local_update solves in the current ADMM implementation as follows:

def local_update(y, X, w, z, u, rho, fprime=proximal_logistic_gradient,
                 f=proximal_logistic_loss,
                 solver=fmin_l_bfgs_b):
    w = w.ravel()
    u = u.ravel()
    z = z.ravel()
    solver_args = (X, y, z, u, rho)
    w, f, d = solver(f, w, fprime=fprime, args=solver_args, pgtol=1e-10,
                     maxiter=200,
                     maxfun=250, factr=1e-30)
    if d['warnflag']:
        raise ValueError(
            '''Internal LBFGSB algorithm failed to converge!
            Details: {}'''.format(d['task']))
    else:
        return w.reshape(2, 1)

Then on random data I see convergence issues:

from dask_glm.utils import make_y
X = da.random.random((100,2), chunks=(100/5, 2))
y = make_y(X)
logistic_regression(X, y[:,None], 0.1, 0.1, 1)

With the following error:

Traceback (most recent call last):
...
dask.async.ValueError: Internal LBFGSB algorithm failed to converge!
            Details: b'ABNORMAL_TERMINATION_IN_LNSRCH'

which refers to abnormal termination in the line search (always with the line search...).

From the fmin_l_bfgs_b output:

Line search cannot locate an adequate point after 20 function
  and gradient evaluations.  Previous x, f and g restored.
 Possible causes: 1 error in function or gradient evaluation;
                  2 rounding error dominate computation.

New version breaks compatibility with non-dask data

Describe the issue:

Version 0.3.0 includes changes that break the compatibility of some functions - notably the families - to work with non-dask data.

As one example, the newton function changed the line calculating betas from:

beta = np.zeros_like(X, shape=p)

to:

beta = np.zeros_like(X._meta, shape=p)

which will fail for any non-dask data.

Minimal Complete Verifiable Example:

from dask_glm.algorithms import newton
import numpy as np
X = np.random.random((100, 3))
y = np.random.random((100,))
newton(X, y)

Anything else we need to know?: If this is an intended change, would be good to call out the reason for it (sorry if I missed it).

Environment: Local (macbook pro - M1)

  • Dask version: 2023.3.0
  • Python version: 3.10.10
  • Operating System: Mac osx
  • Install method (conda, pip, source): pip

Dask-GLM doesn't converge with Dask array

After a bit of profiling, this is what I found out for Dask-GLM with Dask array:

    14339    0.139    0.000    0.814    0.000 /home/pentschev/.local/lib/python3.5/site-packages/dask/local.py:430(fire_task)
    44898   19.945    0.000   19.945    0.000 {method 'acquire' of '_thread.lock' objects}
     4055    0.042    0.000   19.992    0.005 /usr/lib/python3.5/threading.py:261(wait)
    14339    0.107    0.000   20.234    0.001 /usr/lib/python3.5/queue.py:147(get)
    14339    0.018    0.000   20.253    0.001 /home/pentschev/.local/lib/python3.5/site-packages/dask/local.py:140(queue_get)
      122    0.117    0.001   22.327    0.183 /home/pentschev/.local/lib/python3.5/site-packages/dask/local.py:345(get_async)
      122    0.013    0.000   22.346    0.183 /home/pentschev/.local/lib/python3.5/site-packages/dask/threaded.py:33(get)
      122    0.004    0.000   22.733    0.186 /home/pentschev/.local/lib/python3.5/site-packages/dask/base.py:345(compute)
        1    0.020    0.020   23.224   23.224 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/algorithms.py:200(admm)
        1    0.000    0.000   23.267   23.267 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/utils.py:13(normalize_inputs)
        1    0.000    0.000   23.268   23.268 /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/estimators.py:65(fit)

A big portion of the time seems to be spent on waiting for thread lock. Also, looking at the callers, we see 100 compute() calls departing from admm(), which means it's not converging and stopping only at max_iter as @cicdw suggested:

/home/pentschev/.local/lib/python3.5/site-packages/dask/base.py:345(compute)                               <-     100    0.004   19.637  /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/algorithms.py:197(admm)

Running with NumPy, the algorithm converges, showing only 7 compute() calls:

/home/pentschev/.local/lib/python3.5/site-packages/dask/base.py:345(compute)                          <-       7    0.000    0.120  /home/pentschev/.local/lib/python3.5/site-packages/dask_glm/algorithms.py:197(admm)

I'm running Dask 1.1.4 and Dask-GLM master branch, to ensure that my local changes aren't introduce any bugs. However, if I run my Dask-GLM branch and use CuPy as a backend, it also converges in 7 iterations.

To me this seems to suggest that we have one of those very well-hidden and difficult to track bugs in Dask. Before I spent hours with this, any suggestions what could we look for?

Originally posted by @pentschev in dask/dask-blog#15

Note also that Dask-GLM estimators were deprecated in #66 in favor of Dask-ML. If this is truly a bug in Dask-GLM, it may have been fixed in Dask-ML already.

create conda-forge package

For running dask-glm in internal (enterprise) environment, i run into scenarios where a conda-forge package is much easier to mirror internally than straight from the git (external) repo.

Warning on config "optimization.fuse.ave-width" w/ solver="lbfgs"

What happened:
When I set solver=lbfgs, I get a warning:

/Users/scott/anaconda3/lib/python3.8/site-packages/dask/config.py:588: UserWarning: Configuration key "fuse_ave_width" has been deprecated. Please use "optimization.fuse.ave-width" instead

What you expected to happen:
I did not expect this warning.

Minimal Complete Verifiable Example:

from sklearn.datasets import make_classification
from dask_ml.linear_model import LogisticRegression

X, y = make_classification()
est = LogisticRegression(solver="lbfgs")
est.fit(X, y)

Environment:

  • Dask-ML: 1.9.0
  • Python version: 3.8.8
  • Operating System: macOS
  • Install method (conda, pip, source): ?

Package reorganization

It'd be good to clarify the boundaries of dask-glm and dask-ml. My motivation is building up a set of utilities in dask-ml for working generically with dask or NumPy arrays, and dask or pandas dataframes.

I'd like to move the estimator interface over to dask-ml and leave all the optimization and regularization logic here. I think that families would stay in dask_glm.

With that reorganization, the interface is:

  • dask_glm for lower-level optimization routines on large dask arrays
  • dask_ml for higher-level estimators, which can work on dask, numpy, or pandas data structures.

Robustness Tests

We should have a testing framework / experimentation engine to test the algorithms for robustness to various types of input data (normalized / not-normalized / bad condition numbers / partial separability / low class sizes, etc.) -- eventually we might even want some sort of switching method that, for example, might detect bad conditioning in Newton's method and switch to gradient descent. This will require first writing a way to generate such data.

@mcg1969 @jcrist

Extend GLM Families

So far in https://github.com/dask/dask-glm/blob/master/dask_glm/families.py we have families for linear/normal and logistic that look like the following:

class Logistic(object):
    @staticmethod
    def loglike(Xbeta, y):
        eXbeta = exp(Xbeta)
        return (log1p(eXbeta)).sum() - dot(y, Xbeta)

    @staticmethod
    def pointwise_loss(beta, X, y):
        '''Logistic Loss, evaluated point-wise.'''
        beta, y = beta.ravel(), y.ravel()
        Xbeta = X.dot(beta)
        return Logistic.loglike(Xbeta, y)

    @staticmethod
    def pointwise_gradient(beta, X, y):
        '''Logistic gradient, evaluated point-wise.'''
        beta, y = beta.ravel(), y.ravel()
        Xbeta = X.dot(beta)
        return Logistic.gradient(Xbeta, X, y)

    @staticmethod
    def gradient(Xbeta, X, y):
        p = sigmoid(Xbeta)
        return dot(X.T, p - y)

    @staticmethod
    def hessian(Xbeta, X):
        p = sigmoid(Xbeta)
        return dot(p * (1 - p) * X.T, X)
  • Do we want to extend these to other GLM families?
  • If so then what other options are most useful?
  • Is the current interface sufficient and ideal?

Move default branch from "master" -> "main"

@jrbourbeau and I are in the process of moving the default branch for this repo from master to main.

  • Changed in GitHub
  • Merged PR to change branch name in code

What you'll see

Once the name on github is changed (the first box above is Xed, or this issue closed), when you try to git pull you'll get

Your configuration specifies to merge with the ref 'refs/heads/master'
from the remote, but no such ref was fetched.

What you need to do

First: head to your fork and rename the default branch there
Then:

git branch -m master main
git fetch origin
git branch -u origin/main main

Add conda environment.yml

@moody-marlin pointed out that we need to add an environment.yml file for ease of use with conda.

Upgrade Rquired in dask-glm

Caused by: org.apache.spark.SparkException: Process List(/databricks/python/bin/pip, install, dask-glm==0.2.0, --disable-pip-version-check) exited with code 1. mlflow 1.5.0 requires alembic, which is not installed.

While using Databricks Installation breaks.
Dask-glm is using mlflow 1.5.0 ?
Can you upgrade?

API discussion

One of the biggest hanging issues is to organize / discuss what the API will look like for dask_glm and what sorts of things we expose. This decision impacts algorithm structure going forward (as we incorporate other GLM families and regularizers) so I will need some help focusing the following discussion on specific action items. My initial thoughts were to:

  • Aim for generality so users can create their own GLM families / regularizers: My initial thought along these lines were to have an Optimizer(**initialization_args, **numerical_algo_args, **regularization_args) class that can be inherited by any class that implements func(), grad(), hessian() methods. This would allow us to focus on algorithm development separately from model specification and add additional model types (Normal, Poisson, etc.) easily. The user then calls Logistic(X,y, **kwargs) to initialize a model, and then .fit(**algo_args) calls on the methods from Optimizer. This can get hairy, and could possibly prevent further optimizations that exploit specific structure inherent in the models.

  • Hard code the models users can choose from: Per my conversation with @jcrist, we might not want to aim for extreme flexibility in the model API, as most people probably don't need / want to define their own GLM families or their own regularizers. Related, we might want to implement the algorithms specifically for each family / regularizer.

  • Regularized vs. unregularized models: How do we manage the 'switch' from regularized to unregularized models? Should it be something as simple as model = LogisticModel(X, y, reg='l2') and the user chooses from a pre-defined list of regularizers, or should we allow for model = LogisticModel(X, y, reg=reg_func) where reg_func() is any regularization function on the parameters, allowing for customizations such as grouped lasso, non-negativity, etc? Again I was hoping to keep things as general as possible so that the user can mix and match, but on the algorithm side this can get difficult especially if we want speed and accuracy.

-Inherit from scikit-learn: Per my conversation with @hussainsultan, we might want to try and stick as closely as possible to scikit-learn's API, as this is common enough to be comfortable for most people. However, we don't want to adhere to this too strongly (especially in output), because we started with the goal of intelligently toggling between inferential and predictive applications (thus requiring things like inferential stats that scikit-learn currently doesn't include).

Ultimately, I think my thought process boils down to: should we expose an API to access the underlying algorithms or should we bake them into the various specific GLM families we want to consider?

cc: @mrocklin @mcg1969

Merge CuPy and Dask tests and datasets

To allow for easy testing of CuPy and Dask during this phase, a copy of the existing relevant Dask tests was created under #75. This incurs in a lot of duplication that can later be merged into a single implementation to test both backends. The same happens for the datasets module, which was duplicated for the time being.

Merge forks/branches

People have submitted PRs to three forks/branches recently:

  1. dask/master
  2. moody-marlin/master
  3. moody-marlin/no_api

It would be good to merge these all to some fork/branch quickly so that we don't need to go through expensive merging later on. I don't particulaly care which fork/branch, but I would prefer to see things drive towards a common point. It becomes expensive to merge if things diverge for too long.

cc @moody-marlin @hussainsultan

ADMM is susceptible to overflow

(edited to not require the new API)

Probably other algorithms too. I see that proximal_grad does handle it correctly.
This little script compares the fit for scikit-learn LogisticRegression and our admm against a baseline of no-overflow, to a copy of the dataset with n values in a range that will overflow when passed through the sigmoid function.

import warnings

import dask.array as da
import numpy as np
from scipy.optimize import fmin_l_bfgs_b
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from dask_glm.algorithms import admm

warnings.simplefilter("ignore", RuntimeWarning)

C = 10
lambda_ = 1 / C
np.random.seed(2)
X, y = datasets.make_classification(n_classes=2, n_samples=1000)

Xda = da.from_array(X, (10, X.shape[1]))
yda = da.from_array(y, (10,))

lrbase = LogisticRegression(fit_intercept=False, C=C, max_iter=10).fit(X, y)
admm_base = admm(Xda, yda, lamduh=lambda_, max_iter=10)


def fit_and_compare(X, y, n):
    X = X.copy()
    np.random.seed(2)
    idx = np.random.choice(np.arange(len(X)), size=(n,))
    X[idx, 1] = np.random.randint(750, 1000, size=(n,))

    lr = LogisticRegression(fit_intercept=False, C=C, max_iter=10)
    lr.fit(X, y)

    Xda = da.from_array(X, (10, X.shape[1]))
    yda = da.from_array(y, (10,))
    result = admm(Xda, yda, lamduh=lambda_, max_iter=10)

    print(f"Overflow={n}")
    print("Sklearn:", np.mean(np.abs(lrbase.coef_ - lr.coef_)))
    print("admm   :", np.mean(np.abs(admm_base - result)))
    return lr.coef_, result


fit_and_compare(X, y, 0)
fit_and_compare(X, y, 5)
fit_and_compare(X, y, 10)
fit_and_compare(X, y, 50)
fit_and_compare(X, y, 100)

Outputs

Overflow=0
Sklearn: 6.21681525703e-17
admm   : 1.04008099876e-13
Overflow=5
Sklearn: 0.00377521192273
admm   : 0.0975875697572
Overflow=10
Sklearn: 0.00311021598129
admm   : 0.118998783966
Overflow=50
Sklearn: 0.00055956264663
admm   : 0.178748360839
Overflow=100
Sklearn: 0.00312859463805
admm   : 0.196356993923

Changes to ENet documentation

Hello!

I've made some changes to the ENet prox operator documentation, and I'd like to submit a pull request and have it looked at.

I've put it in my own branch on my machine - but when I try to make a PR I get told that I'm not authenticated to do that.

More generally, I've got time until the new year to write some more documentation or I could re implement some of my PhD work on fully distributed ADMM (but I'd need help!).

Tom

BFGS: to keep or not to keep?

Currently, the implementation of bfgs isn't always converging, and the implementation is messy. Should we spend some time working on this, or toss it? If we're going to keep it, what exactly is going on with the implementation?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.