Giter Site home page Giter Site logo

sk-dist's Introduction

sk-dist

sk-dist: Distributed scikit-learn meta-estimators in PySpark

License PyPI Package Downloads Python Versions

What is it?

sk-dist is a Python package for machine learning built on top of scikit-learn and is distributed under the Apache 2.0 software license. The sk-dist module can be thought of as "distributed scikit-learn" as its core functionality is to extend the scikit-learn built-in joblib parallelization of meta-estimator training to spark. A popular use case is the parallelization of grid search as shown here:

sk-dist

Check out the blog post for more information on the motivation and use cases of sk-dist.

Main Features

  • Distributed Training - sk-dist parallelizes the training of scikit-learn meta-estimators with PySpark. This allows distributed training of these estimators without any constraint on the physical resources of any one machine. In all cases, spark artifacts are automatically stripped from the fitted estimator. These estimators can then be pickled and un-pickled for prediction tasks, operating identically at predict time to their scikit-learn counterparts. Supported tasks are:
  • Distributed Prediction - sk-dist provides a prediction module which builds vectorized UDFs for PySpark DataFrames using fitted scikit-learn estimators. This distributes the predict and predict_proba methods of scikit-learn estimators, enabling large scale prediction with scikit-learn.
  • Feature Encoding - sk-dist provides a flexible feature encoding utility called Encoderizer which encodes mix-typed feature spaces using either default behavior or user defined customizable settings. It is particularly aimed at text features, but it additionally handles numeric and dictionary type feature spaces.

Installation

Dependencies

sk-dist requires:

Dependency Notes

  • versions of numpy, scipy and joblib that are compatible with any supported version of scikit-learn should be sufficient for sk-dist
  • sk-dist is not supported with Python 2

Spark Dependencies

Most sk-dist functionality requires a spark installation as well as PySpark. Some functionality can run without spark, so spark related dependencies are not required. The connection between sk-dist and spark relies solely on a sparkContext as an argument to various sk-dist classes upon instantiation.

A variety of spark configurations and setups will work. It is left up to the user to configure their own spark setup. The testing suite runs spark 2.4 and spark 3.0, though any spark 2.0+ versions are expected to work.

Additional spark related dependecies are pyarrow, which is used only for skdist.predict functions. This uses vectorized pandas UDFs which require pyarrow>=0.8.0, tested with pyarrow==0.16.0. Depending on the spark version, it may be necessary to set spark.conf.set("spark.sql.execution.arrow.enabled", "true") in the spark configuration.

User Installation

The easiest way to install sk-dist is with pip:

pip install --upgrade sk-dist

You can also download the source code:

git clone https://github.com/Ibotta/sk-dist.git

Testing

With pytest installed, you can run tests locally:

pytest sk-dist

Examples

The package contains numerous examples on how to use sk-dist in practice. Examples of note are:

Gradient Boosting

sk-dist has been tested with a number of popular gradient boosting packages that conform to the scikit-learn API. This includes xgboost and catboost. These will need to be installed in addition to sk-dist on all nodes of the spark cluster via a node bootstrap script. Version compatibility is left up to the user.

Support for lightgbm is not guaranteed, as it requires additional installations on all nodes of the spark cluster. This may work given proper installation but has not beed tested with sk-dist.

Background

The project was started at Ibotta Inc. on the machine learning team and open sourced in 2019.

It is currently maintained by the machine learning team at Ibotta. Special thanks to those who contributed to sk-dist while it was initially in development at Ibotta:

Thanks to James Foley for logo artwork.

IbottaML

sk-dist's People

Contributors

cfrazier91 avatar chadfoley36 avatar denver1117 avatar mattreyuk avatar rpcrimi avatar synapticarbors avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sk-dist's Issues

_index_param_value not defined when searching params for CatBoostClassifier

Getting a NameError: name '_index_param_value' is not defined error while fitting DistRandomizedSearchCV on a CatBoostClassifier model. Here is some sample code:

params = {
    'depth': [4,6,8],
    'learning_rate': [0.02,0.05,0.1],
    'l2_leaf_reg': [1,5,10],
    'random_strength': [1,5,10],
    'bagging_temperature': [1,5,10],
    'one_hot_max_size': [2,10,50],
    'logging_level': ['Silent'],
    'use_best_model': [True],
    'od_type': ['Iter'],
    'od_wait': [20],
    'eval_metric': ['F1'],
    'iterations': [2000],
    'scale_pos_weight': [scale_multiplier],
    'colsample_bylevel': [0.6,0.8,1],
    'random_state': [42],
    'cat_features': [categorical_features_indices]
    }

model = DistRandomizedSearchCV(
    CatBoostClassifier(),
    params, sc, cv=3, scoring='roc_auc', n_iter=10, verbose=1
    )

model.fit(X_train, y_train, eval_set=(X_validation, y_validation))

Possibly an error on my part. But the error message implies _index_param_value needs to be defined, which appears to be missing in the source code.

I am able to run a DistRandomizedSearchCV search if I drop the eval_set and other overfitting detection parameters. However, it would be great to be able to utilized CatBoost's native overfitting detection.

Warnings in Pyspark Tests

Describe the bug
There are many warnings in the testing suite for the pyspark tests.

To Reproduce
These can be seen directly in the travis logs: https://travis-ci.org/Ibotta/sk-dist

Expected behavior
Ideally, we have no warnings in the test suite.

Additional context
Some warnings are due to pyspark code under the hood which we do not have control over.

Pyarrow version 0.16.0 is not working with test suite

Describe the bug
The test suite cannot import pyarrow (0.16.0) in python 3.5 or python 3.7. There do not seem to be any errors in the installation.
Failing travis build: https://travis-ci.org/Ibotta/sk-dist/jobs/648507535?utm_medium=notification&utm_source=github_status
Addressed by pinning pyarrow in this PR: #36

To Reproduce
Steps to reproduce the behavior:

  1. Unpin pyarrow in setup.py
  2. Run the travis build

Expected behavior
Ideally we'd be able to unpin pyarrow in the test suite and regularly test on the most recent version (0.16.0)

Additional context
This was revealed when pyarrow released 0.16.0 on 2/7/2020

Spark local-mode testing for Travis CI

Is your feature request related to a problem? Please describe.
The current test suite is meant to run only in a python environment after setup/installation. This doesn't allow testing Spark functionality in a spark environment.

Describe the solution you'd like
Ideally, we'd have either a separate test suite, or the entire test suite would be meant to run only in a Spark environment, setup locally by Travis CI.

Additional context
Most functionality can be tested without Spark, but the Spark parallelization itself cannot be tested, and Spark version compatibility cannot be tested. This would make the testing suite more robust to both sk-dist code changes and Spark/PySpark version changes/updates.

Simple Voter Class Error - TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'

After creating a pipeline with my best models and including the feature-transformer I used to create the training data... I get the following error:

TypeError: Cannot cast array data from dtype('O') to dtype('int64') according to the rule 'safe'

My feature pipeline makes use of StandardScaler, OneHotEncoder, HashingVectorizer and uses make_column_transformer to make a Pandas column transformer.

ensemble_pipeline = make_pipeline(
    feature_pipeline,
    SimpleVoter(
    best_models, 
    classes=best_models[0][1].classes_, voting="hard"
    ),
)

Any idea the reason behind this error?

Thanks!

Support for sklearn 0.24

Is your feature request related to a problem? Please describe.
sklearn updated to 0.24 and skdist is not compatible

Describe the solution you'd like
Full support for sklearn 0.24

Additional context
This only impacts the ensemble module. Maybe there is a workaround?

Any chance to use also the Pandas_UDF interface for even faster speeds?

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Not a problem per se. But Spark UDFs are slower than pyspark Pandas_UDFs. And both are slower than Scala UDFs
Pandas_udfs however are in python and use the pandas interface internally so they are easier to code.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Any chance that you could add functionality so things can be achieved via the Pandas_UDF interface

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

This is how PANDAS_UDFs work internally.

image

For more info:
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

cannot get answer with LGBMClassifier

I change xgb to lgb but can't get any return

it cost 1sec on GridSearchCV

grid=dict(num_leaves=[8,15,31],
     n_estimators=[100, 200, 300])
for _ in trange(1):
    model_lgb = GridSearchCV(
        LGBMClassifier(),
        grid, n_jobs=4, cv=3
        )
    model_lgb.fit(X,y)

but no return in 10 min with DistGridSearchCV

grid=dict(num_leaves=[8,15,31],
     n_estimators=[100, 200, 300],
         n_jobs=1)
for _ in trange(1):
    model_lgb = DistGridSearchCV(
        LGBMClassifier(),
        grid, sc, cv=3,n_jobs=1
        )
    model_lgb.fit(X,y)

Python REPL connection issues

Describe the bug

When performing a GridSearch (or distributed or randomised), the tasks finish but the connection is never returned. Eventually the connection is closed with an out-of-memory exception. It does not appear to be an out-of-memory error as the tasks complete.

To Reproduce
Steps to reproduce the behavior:

Train a large Tree Based model across worker nodes in a grid search. It seems to only occur when a larger amount of time is required to construct the model.

Expected behavior

Additional context

Seems related to: apache/spark#24898

I replaced sk-dist with joblibspark, trained the same model which completed (joblib reports tasks done) and received the error:

because pyspark py4j is not in pinned thread mode, we could not terminate running spark jobs correctly.

PR Template

Is your feature request related to a problem? Please describe.
We need a PR template.

Describe the solution you'd like
Boilerplate is fine.

Ibotta no longer supports Travis CI

Describe the bug
This library uses Travis CI which is no longer supported by Ibotta (switched to GitHub Actions). In addition, the CI process has been failing for a long time due to requiring obsolete versions of Spark and Python.

To Reproduce
Check CI builds

Expected behavior
CI should build without issue and run on Ibotta supported tooling

Additional context
I can help switch to GitHub Actions if required

PicklingError: Could not serialize broadcast

When I use a Tree Method (such as ExtraTrees or RandomForest) rather than another classification algorithm, I seem to run into the following error:

PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB

predict = get_prediction_udf(model, method="predict", feature_type="pandas", names=list(X_train.columns))

cols = [F.col(str(c)) for c in list(X_train.columns)]
# apply predict UDFs and select prediction output
prediction_df = (
    unlabelled
    .withColumn("pred", predict(*cols))
    )

Traceback below:

---------------------------------------------------------------------------
OverflowError                             Traceback (most recent call last)
/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
    120         try:
--> 121             pickle.dump(value, f, 2)
    122         except pickle.PickleError:

OverflowError: cannot serialize a string larger than 4GiB

During handling of the above exception, another exception occurred:

PicklingError                             Traceback (most recent call last)
<command-1349179484758480> in <module>()
      3 prediction_df = (
      4     unlabelled
----> 5     .withColumn("pred", predict(*cols))
      6     )

/databricks/spark/python/pyspark/sql/udf.py in wrapper(*args)
    194         @functools.wraps(self.func, assigned=assignments)
    195         def wrapper(*args):
--> 196             return self(*args)
    197 
    198         wrapper.__name__ = self._name

/databricks/spark/python/pyspark/sql/udf.py in __call__(self, *cols)
    172 
    173     def __call__(self, *cols):
--> 174         judf = self._judf
    175         sc = SparkContext._active_spark_context
    176         return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

/databricks/spark/python/pyspark/sql/udf.py in _judf(self)
    156         # and should have a minimal performance impact.
    157         if self._judf_placeholder is None:
--> 158             self._judf_placeholder = self._create_judf()
    159         return self._judf_placeholder
    160 

/databricks/spark/python/pyspark/sql/udf.py in _create_judf(self)
    165         sc = spark.sparkContext
    166 
--> 167         wrapped_func = _wrap_function(sc, self.func, self.returnType)
    168         jdt = spark._jsparkSession.parseDataType(self.returnType.json())
    169         judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(

/databricks/spark/python/pyspark/sql/udf.py in _wrap_function(sc, func, returnType)
     33 def _wrap_function(sc, func, returnType):
     34     command = (func, returnType)
---> 35     pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
     36     return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
     37                                   sc.pythonVer, broadcast_vars, sc._javaAccumulator)

/databricks/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command)
   2464     if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc):  # Default 1M
   2465         # The broadcast will have same life cycle as created PythonRDD
-> 2466         broadcast = sc.broadcast(pickled_command)
   2467         pickled_command = ser.dumps(broadcast)
   2468     broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]

/databricks/spark/python/pyspark/context.py in broadcast(self, value)
    897         be sent to each cluster only once.
    898         """
--> 899         return Broadcast(self, value, self._pickled_broadcast_vars)
    900 
    901     def accumulator(self, value, accum_param=None):

/databricks/spark/python/pyspark/broadcast.py in __init__(self, sc, value, pickle_registry, path, sock_file)
     97                 # no encryption, we can just write pickled data directly to the file from python
     98                 broadcast_out = f
---> 99             self.dump(value, broadcast_out)
    100             if sc._encryption_enabled:
    101                 self._python_broadcast.waitTillDataReceived()

/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f)
    126                   % (e.__class__.__name__, _exception_message(e))
    127             print_exec(sys.stderr)
--> 128             raise pickle.PicklingError(msg)
    129         f.close()
    130 

PicklingError: Could not serialize broadcast: OverflowError: cannot serialize a string larger than 4GiB

OSError: [WinError 123]: Spark on Windows local

I'm trying to execute an expample:

spark Version: 2.3.4

import timefrom sklearn import datasets, svm
from skdist.distribute.search import DistGridSearchCV
from pyspark.sql import SparkSession # instantiate spark session
spark = (   
    SparkSession    
    .builder    
    .getOrCreate()    
    )
sc = spark.sparkContext # the digits dataset
digits = datasets.load_digits()
X = digits["data"]
y = digits["target"] # create a classifier: a support vector classifier
classifier = svm.SVC()
param_grid = {
    "C": [0.01, 0.01, 0.1, 1.0, 10.0, 20.0, 50.0], 
    "gamma": ["scale", "auto", 0.001, 0.01, 0.1], 
    "kernel": ["rbf", "poly", "sigmoid"]
    }
scoring = "f1_weighted"
cv = 10# hyperparameter optimization
start = time.time()
model = DistGridSearchCV(    
    classifier, param_grid,     
    sc=sc, cv=cv, scoring=scoring,
    verbose=True    
    )

when I try to train my model, model.fit(X,y) it's fails with

OSError: [WinError 123] Die Syntax für den Dateinamen, Verzeichnisnamen oder die Datenträgerbezeichnung ist falsch: 'C:\C:\spark\jars\spark-core_2.11-2.3.4.jar'

The Spak_Home is set as "C:\spark" and the PATH %SPARK_HOME%\bin

Without SparkContext I'm able to run the code

model = DistGridSearchCV(    
    classifier, param_grid,     
    **sc=sc**, cv=cv, scoring=scoring)

to:

model = DistGridSearchCV(    
    classifier, param_grid,     
    cv=cv, scoring=scoring)

I did also try to pass the "spakHome" variable when defnied the SparkContext without the driver letter "C:"

SparkContext(appName="Dist_Exmp",sparkHome="spark")
sc.sparkHome --> spark

But the variable is taken from the EvnVar.

Here the whole Trace:

`---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
in
----> 1 model.fit(X_train, y_train)

C:\MeineProgramme\anaconda3\lib\site-packages\skdist\distribute\search.py in fit(self, X, y, groups, **fit_params)
367 base_estimator_ = self.sc.broadcast(base_estimator)
368 partitions = _parse_partitions(self.partitions, len(fit_sets))
--> 369 out = self.sc.parallelize(fit_sets, numSlices=partitions).map(lambda x: [x[0], fit_and_score(
370 base_estimator
, X, y, scorers, x[2][0], x[2][1],
371 verbose, x[1], fit_params=fit_params,

C:\MeineProgramme\anaconda3\lib\site-packages\pyspark\rdd.py in collect(self)
812 """
813 with SCCallSiteSync(self.context) as css:
--> 814 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
815 return list(_load_from_socket(sock_info, self._jrdd_deserializer))
816

C:\MeineProgramme\anaconda3\lib\site-packages\py4j\java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

C:\MeineProgramme\anaconda3\lib\site-packages\pyspark\sql\utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()

C:\MeineProgramme\anaconda3\lib\site-packages\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 1.0 failed 1 times, most recent failure: Lost task 1.0 in stage 1.0 (TID 6, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 240, in main
File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 60, in read_command
File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 171, in read_with_length
return self.loads(obj)
File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 566, in loads
return pickle.loads(obj, encoding=encoding)
File "C:\MeineProgramme\anaconda3\lib\site-packages\skdist\distribute\search.py", line 14, in
from sklearn.model_selection import (
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_init
.py", line 19, in
from .validation import cross_val_score
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_validation.py", line 27, in
from ..metrics.scorer import check_scoring, check_multimetric_scoring
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics_init
.py", line 7, in
from .ranking import auc
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in
from ..preprocessing import label_binarize
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_init
.py", line 6, in
from .function_transformer import FunctionTransformer
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_function_transformer.py", line 5, in
from ..utils.testing import assert_allclose_dense_sparse
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in
import pytest
File "C:\MeineProgramme\anaconda3\lib\site-packages\pytest.py", line 6, in
from pytest.assertion import register_assert_rewrite
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion_init
.py", line 7, in
from pytest.assertion import rewrite
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\rewrite.py", line 26, in
from pytest.assertion import util
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\util.py", line 8, in
import pytest.code
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code_init
.py", line 2, in
from .code import Code # noqa
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code\code.py", line 23, in
import pluggy
File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy_init
.py", line 16, in
from .manager import PluginManager, PluginValidationError
File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy\manager.py", line 11, in
import importlib_metadata
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 547, in
version = version(name)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 509, in version
return distribution(distribution_name).version
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 482, in distribution
return Distribution.from_name(distribution_name)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 183, in from_name
dist = next(dists, None)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 425, in
for path in map(cls.switch_path, paths)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init
.py", line 449, in _search_path
if not root.is_dir():
File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1358, in is_dir
return S_ISDIR(self.stat().st_mode)
File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1168, in stat
return self._accessor.stat(self)
OSError: [WinError 123] Die Syntax für den Dateinamen, Verzeichnisnamen oder die Datenträgerbezeichnung ist falsch: 'C:\C:\spark\jars\spark-core_2.11-2.3.4.jar'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:336)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:475)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:458)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1661)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1649)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1648)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1648)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1882)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1820)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:165)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 240, in main
File "C:\SPARK\python\lib\pyspark.zip\pyspark\worker.py", line 60, in read_command
File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 171, in read_with_length
return self.loads(obj)
File "C:\SPARK\python\lib\pyspark.zip\pyspark\serializers.py", line 566, in loads
return pickle.loads(obj, encoding=encoding)
File "C:\MeineProgramme\anaconda3\lib\site-packages\skdist\distribute\search.py", line 14, in
from sklearn.model_selection import (
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_init
.py", line 19, in
from .validation import cross_val_score
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\model_selection_validation.py", line 27, in
from ..metrics.scorer import check_scoring, check_multimetric_scoring
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics_init
.py", line 7, in
from .ranking import auc
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\metrics\ranking.py", line 35, in
from ..preprocessing import label_binarize
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_init
.py", line 6, in
from .function_transformer import FunctionTransformer
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\preprocessing_function_transformer.py", line 5, in
from ..utils.testing import assert_allclose_dense_sparse
File "C:\MeineProgramme\anaconda3\lib\site-packages\sklearn\utils\testing.py", line 718, in
import pytest
File "C:\MeineProgramme\anaconda3\lib\site-packages\pytest.py", line 6, in
from pytest.assertion import register_assert_rewrite
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion_init
.py", line 7, in
from pytest.assertion import rewrite
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\rewrite.py", line 26, in
from pytest.assertion import util
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest\assertion\util.py", line 8, in
import pytest.code
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code_init
.py", line 2, in
from .code import Code # noqa
File "C:\MeineProgramme\anaconda3\lib\site-packages_pytest_code\code.py", line 23, in
import pluggy
File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy_init
.py", line 16, in
from .manager import PluginManager, PluginValidationError
File "C:\MeineProgramme\anaconda3\lib\site-packages\pluggy\manager.py", line 11, in
import importlib_metadata
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 547, in
version = version(name)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 509, in version
return distribution(distribution_name).version
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init.py", line 482, in distribution
return Distribution.from_name(distribution_name)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 183, in from_name
dist = next(dists, None)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init_.py", line 425, in
for path in map(cls.switch_path, paths)
File "C:\MeineProgramme\anaconda3\lib\site-packages\importlib_metadata_init
.py", line 449, in _search_path
if not root.is_dir():
File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1358, in is_dir
return S_ISDIR(self.stat().st_mode)
File "C:\MeineProgramme\anaconda3\lib\pathlib.py", line 1168, in stat
return self._accessor.stat(self)
OSError: [WinError 123] Die Syntax für den Dateinamen, Verzeichnisnamen oder die Datenträgerbezeichnung ist falsch: 'C:\C:\spark\jars\spark-core_2.11-2.3.4.jar'

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:336)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:475)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:458)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:290)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more`

Full pipeline tuning?

Does DistGridSearchCV allow for full pipeline tuning? (it doesn't appear to unless I am missing something?)

It would be pretty useful to tune some of the feature extraction parts e.g. number of ngram -hashed features.

  • would just like to state: well done with this project. It's excellent and I appreciate that care and time has been taken to ensure it works as intended.

Multi-Metric Evaluation with DistGridSearchCV results in NameError

Describe the bug
Using multi-metric scoring in DistGridSearchCV results in an NameError:
File "/home//.local/lib/python3.6/site-packages/skdist/distribute/search.py", line 315, in fit
not isinstance(self.refit, six.string_types) or
NameError: name 'six' is not defined

To Reproduce
Steps to reproduce the behavior:
Create a DistGridSearchCV:

GS_EVALUATION_METRICS_DICT = {
'accuracy' : 'accuracy',
'roc_auc' : 'roc_auc'
}

model = GaussianNB()
model_param_grid: {'var_smoothing': [1e-08, 0.0001, 0.01]}

grid_search = DistGridSearchCV(estimator=model, 
                           param_grid=model_param_grid,
                           sc=sc, 
                           scoring=GS_EVALUATION_METRICS_DICT, 
                           n_jobs=6, 
                           pre_dispatch=6,
                           cv=3, 
                           refit='roc_auc',
                           verbose=1,
                           error_score=0,
                           return_train_score=True,
                           )

Expected behavior
A clear and concise description of what you expected to happen.
No NameError

Additional context
I think the error is easily fixable --> Add the import of the six library

sk-dist Conda package

Hi,
I am sorry for not contacting you sooner, but I needed quite urgently to import sk-dist as a Conda package, so I created Conda-forge feedstock for your package. As this package does not have any special dependencies, this was a rather straightforward process. This means that sk-dist is now available as a conda package through conda-forge. Right now I am the maintainer of this feedstock, but I will be happy to forward it to you or add you as a maintainer.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.