Giter Site home page Giter Site logo

dvgodoy / handyspark Goto Github PK

View Code? Open in Web Editor NEW
183.0 15.0 22.0 1.72 MB

HandySpark - bringing pandas-like capabilities to Spark dataframes

License: MIT License

Python 47.50% Jupyter Notebook 52.50%
spark pandas visualization python pyspark exploratory-data-analysis imputation outlier-detection

handyspark's Introduction

Build Status

HandySpark

Bringing pandas-like capabilities to Spark dataframes!

HandySpark is a package designed to improve PySpark user experience, especially when it comes to exploratory data analysis, including visualization capabilities!

It makes fetching data or computing statistics for columns really easy, returning pandas objects straight away.

It also leverages on the recently released pandas UDFs in Spark to allow for an out-of-the-box usage of common pandas functions in a Spark dataframe.

Moreover, it introduces the stratify operation, so users can perform more sophisticated analysis, imputation and outlier detection on stratified data without incurring in very computationally expensive groupby operations.

It brings the long missing capability of plotting data while retaining the advantage of performing distributed computation (unlike many tutorials on the internet, which just convert the whole dataset to pandas and then plot it - don't ever do that!).

Finally, it also extends evaluation metrics for binary classification, so you can easily choose which threshold to use!

Google Colab

Eager to try it out right away? Don't wait any longer!

Open the notebook directly on Google Colab and try it yourself:

Installation

To install HandySpark from PyPI, just type:

pip install handyspark

Documentation

You can find the full documentation here.

Here is a handy list of direct links to some classes, objects and methods used:

Quick Start

To use HandySpark, all you need to do is import the package and, after loading your data into a Spark dataframe, call the toHandy() method to get your own HandyFrame:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from handyspark import *
sdf = spark.read.csv('./tests/rawdata/train.csv', header=True, inferSchema=True)
hdf = sdf.toHandy()

Fetching and plotting data

Now you can easily fetch data as if you were using pandas, just use the cols object from your HandyFrame:

hdf.cols['Name'][:5]

It should return a pandas Series object:

0                              Braund, Mr. Owen Harris
1    Cumings, Mrs. John Bradley (Florence Briggs Th...
2                               Heikkinen, Miss. Laina
3         Futrelle, Mrs. Jacques Heath (Lily May Peel)
4                             Allen, Mr. William Henry
Name: Name, dtype: object

If you include a list of columns, it will return a pandas DataFrame.

Due to the distributed nature of data in Spark, it is only possible to fetch the top rows of any given HandyFrame.

Using cols you have access to several pandas-like column and DataFrame based methods implemented in Spark:

  • min / max / median / q1 / q3 / stddev / mode
  • nunique
  • value_counts
  • corr
  • hist
  • boxplot
  • scatterplot

For instance:

hdf.cols['Embarked'].value_counts(dropna=False)
S      644
C      168
Q       77
NaN      2
Name: Embarked, dtype: int64

You can also make some plots:

from matplotlib import pyplot as plt
fig, axs = plt.subplots(1, 4, figsize=(12, 4))
hdf.cols['Embarked'].hist(ax=axs[0])
hdf.cols['Age'].boxplot(ax=axs[1])
hdf.cols['Fare'].boxplot(ax=axs[2])
hdf.cols[['Fare', 'Age']].scatterplot(ax=axs[3])

cols plots

Handy, right (pun intended!)? But things can get even more interesting if you use stratify!

Stratify

Stratifying a HandyFrame means using a split-apply-combine approach. It will first split your HandyFrame according to the specified (discrete) columns, then it will apply some function to each stratum of data and finally combine the results back together.

This is better illustrated with an example - let's try the stratified version of our previous value_counts:

hdf.stratify(['Pclass']).cols['Embarked'].value_counts()
Pclass  Embarked
1       C            85
        Q             2
        S           127
2       C            17
        Q             3
        S           164
3       C            66
        Q            72
        S           353
Name: value_counts, dtype: int64

Cool, isn't it? Besides, under the hood, not a single group by operation was performed - everything is handled using filter clauses! So, no data shuffling!

What if you want to stratify on a column containing continuous values? No problem!

hdf.stratify(['Sex', Bucket('Age', 2)]).cols['Embarked'].value_counts()
Sex     Age                                Embarked
female  Age >= 0.4200 and Age < 40.2100    C            46
                                           Q            12
                                           S           154
        Age >= 40.2100 and Age <= 80.0000  C            15
                                           S            32
male    Age >= 0.4200 and Age < 40.2100    C            53
                                           Q            11
                                           S           287
        Age >= 40.2100 and Age <= 80.0000  C            16
                                           Q             5
                                           S            81
Name: value_counts, dtype: int64

You can use either Bucket or Quantile to discretize your data in any given number of bins!

What about plotting it? Yes, HandySpark can handle that as well!

hdf.stratify(['Sex', Bucket('Age', 2)]).cols['Embarked'].hist(figsize=(8, 6))

stratified hist

Handling missing data

HandySpark makes it very easy to spot and fill missing values. To figure if there are any missing values, just use isnull:

hdf.isnull(ratio=True)
PassengerId    0.000000
Survived       0.000000
Pclass         0.000000
Name           0.000000
Sex            0.000000
Age            0.198653
SibSp          0.000000
Parch          0.000000
Ticket         0.000000
Fare           0.000000
Cabin          0.771044
Embarked       0.002245
Name: missing(ratio), dtype: float64

Ok, now you know there are 3 columns with missing values: Age, Cabin and Embarked. It's time to fill those values up! But, let's skip Cabin, which has 77% of its values missing!

So, Age is a continuous variable, while Embarked is a categorical variable. Let's start with the latter:

hdf_filled = hdf.fill(categorical=['Embarked'])

HandyFrame has a fill method which takes up to 3 arguments:

  • categorical: a list of categorical variables
  • continuous: a list of continuous variables
  • strategy: which strategy to use for each one of the continuous variables (either mean or median)

Categorical variables use a mode strategy by default.

But you do not need to stick with the basics anymore... you can fancy it up using stratify together with fill:

hdf_filled = hdf_filled.stratify(['Pclass', 'Sex']).fill(continuous=['Age'], strategy=['mean'])

How do you know which values are being used? Simple enough:

hdf_filled.statistics_
{'Age': {'Pclass == "1" and Sex == "female"': 34.61176470588235,
  'Pclass == "1" and Sex == "male"': 41.28138613861386,
  'Pclass == "2" and Sex == "female"': 28.722972972972972,
  'Pclass == "2" and Sex == "male"': 30.74070707070707,
  'Pclass == "3" and Sex == "female"': 21.75,
  'Pclass == "3" and Sex == "male"': 26.507588932806325},
 'Embarked': 'S'}

There you go! The filter clauses and the corresponding imputation values!

But there is more - once you're with your imputation procedure, why not generate a custom transformer to do that for you, either on your test set or in production?

You only need to call the imputer method of the transformer object that every HandyFrame has:

imputer = hdf_filled.transformers.imputer()

In the example above, imputer is now a full-fledged serializable PySpark transformer! What does that mean? You can use it in your pipeline and save / load at will :-)

Detecting outliers

Second only to the problem of missing data, outliers can pose a challenge for training machine learning models.

HandyFrame to the rescue, with its outliers method:

hdf_filled.outliers(method='tukey', k=3.)
PassengerId      0.0
Survived         0.0
Pclass           0.0
Age              1.0
SibSp           12.0
Parch          213.0
Fare            53.0
dtype: float64

Currently, only Tukey's method is available. This method takes an optional k argument, which you can set to larger values (like 3) to allow for a more loose detection.

The good thing is, now we can take a peek at the data by plotting it:

from matplotlib import pyplot as plt
fig, axs = plt.subplots(1, 4, figsize=(16, 4))
hdf_filled.cols['Parch'].hist(ax=axs[0])
hdf_filled.cols['SibSp'].hist(ax=axs[1])
hdf_filled.cols['Age'].boxplot(ax=axs[2], k=3)
hdf_filled.cols['Fare'].boxplot(ax=axs[3], k=3)

outliers

Let's focus on the Fare column - what can we do about it? Well, we could use Tukey's fences to, er... fence the outliers :-)

hdf_fenced = hdf_filled.fence(['Fare'])

Which values were used, you ask?

hdf_fenced.fences_
{'Fare': [-26.0105, 64.4063]}

It works quite similarly to the fill method and, I hope you guessed, it also gives you the ability to create the corresponding custom transformer :-)

fencer = hdf_fenced.transformers.fencer()

You can also use Mahalanobis distance to identify outliers in a multi-dimensional space, given a critical value (usually 99.9%, but you are free to have either more restriced or relaxed threshold).

To get the outliers for a subset of columns (only numerical columns are considered!):

outliers = hdf_filled.cols[['Age', 'Fare', 'SibSp']].get_outliers(critical_value=.90)

Let's take a look at the first 5 outliers found:

outliers.cols[:][:5]

outliers

What if you want to discard these sample? You just need to call remove_outliers:

hdf_without_outliers = hdf_filled.cols[['Age', 'Fare', 'SibSp']].remove_outliers(critical_value=0.90)

Evaluating your model!

You cleaned your data, you trained your classification model, you fine-tuned it and now you want to evaluate it, right?

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator

assem = VectorAssembler(inputCols=['Fare', 'Pclass', 'Age'], outputCol='features')
rf = RandomForestClassifier(featuresCol='features', labelCol='Survived', numTrees=20)
pipeline = Pipeline(stages=[assem, rf])
model = pipeline.fit(hdf_fenced)

predictions = model.transform(hdf_fenced)
evaluator = BinaryClassificationEvaluator(labelCol='Survived')
evaluator.evaluate(predictions)

Then you realize evaluators only give you areaUnderROC and areaUnderPR. How about plotting ROC or PR curves? How about finding a threshold that suits your needs for False Positive or False negatives?

HandySpark extends the BinaryClassificationMetrics object to take DataFrames and output all your evaluation needs!

bcm = BinaryClassificationMetrics(predictions, scoreCol='probability', labelCol='Survived')

Now you can plot the curves...

from matplotlib import pyplot as plt
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

curves

...or get metrics for every threshold...

bcm.getMetricsByThreshold().toPandas()[100:105]

metrics

...or the confusion matrix for the threshold you chose:

bcm.print_confusion_matrix(.572006)

cm

Pandas and more pandas!

With HandySpark you can feel almost as if you were using traditional pandas :-)

To gain access to the whole suite of available pandas functions, you need to leverage the pandas object of your HandyFrame:

some_ports = hdf_fenced.pandas['Embarked'].isin(values=['C', 'Q'])
some_ports
Column<b'udf(Embarked) AS `<lambda>(Embarked,)`'>

In the example above, HandySpark treats the Embarked column as if it were a pandas Series and, therefore, you may call its isin method!

But, remember Spark has lazy evaluation, so the result is a column expression which leverages the power of pandas UDFs (provived that PyArrow is installed, otherwise it will fall back to traditional UDFs).

The only thing left to do is to actually assign the results to a new column, right?

hdf_fenced = hdf_fenced.assign(is_c_or_q=some_ports)
# What's in there?
hdf_fenced.cols['is_c_or_q'][:5]
0     True
1    False
2    False
3     True
4     True
Name: is_c_or_q, dtype: bool

You got that right! HandyFrame has a very convenient assign method, just like in pandas!

It does not get much easier than that :-) There are several column methods available already:

  • betweeen / between_time
  • isin
  • isna / isnull
  • notna / notnull
  • abs
  • clip / clip_lower / clip_upper
  • replace
  • round / truncate
  • tz_convert / tz_localize

And this is not all! Both specialized str and dt objects from pandas are available as well!

For instance, if you want to find if a given string contains another substring?

col_mrs = hdf_fenced.pandas['Name'].str.find(sub='Mrs.')
hdf_fenced = hdf_fenced.assign(is_mrs=col_mrs > 0)

is mrs

There are many, many more available methods:

  1. String methods:
  • contains
  • startswith / endswitch
  • match
  • isalpha / isnumeric / isalnum / isdigit / isdecimal / isspace
  • islower / isupper / istitle
  • replace
  • repeat
  • join
  • pad
  • slice / slice_replace
  • strip / lstrip / rstrip
  • wrap / center / ljust / rjust
  • translate
  • get
  • normalize
  • lower / upper / capitalize / swapcase / title
  • zfill
  • count
  • find / rfind
  • len
  1. Date / Datetime methods:
  • is_leap_year / is_month_end / is_month_start / is_quarter_end / is_quarter_start / is_year_end / is_year_start
  • strftime
  • tz / time / tz_convert / tz_localize
  • day / dayofweek / dayofyear / days_in_month / daysinmonth
  • hour / microsecond / minute / nanosecond / second
  • week / weekday / weekday_name
  • month / quarter / year / weekofyear
  • date
  • ceil / floor / round
  • normalize

Your own functions

The sky is the limit! You can create regular Python functions and use assign to create new columns :-)

No need to worry about turning them into pandas UDFs - everything is handled by HandySpark under the hood!

The arguments of your function (or lambda) should have the names of the columns you want to use. For instance, to take the log of Fare:

import numpy as np
hdf_fenced = hdf_fenced.assign(logFare=lambda Fare: np.log(Fare + 1))

logfare

You can also use multiple columns:

hdf_fenced = hdf_fenced.assign(fare_times_age=lambda Fare, Age: Fare * Age)

Even though the result is kinda pointless, it will work :-)

Keep in mind that the return type, that is, the column type of the new column, will be the same as the first column used (Fare, in the example).

What if you want to return something of a different type?! No worries! You only need to wrap your function with the desired return type. An example should make this more clear:

from pyspark.sql.types import StringType

hdf_fenced = hdf_fenced.assign(str_fare=StringType.ret(lambda Fare: Fare.map('${:,.2f}'.format)))

hdf_fenced.cols['str_fare'][:5]
0    $65.66
1    $53.10
2    $26.55
3    $65.66
4    $65.66
Name: str_fare, dtype: object

Basically, we imported the desired output type - StringType - and used its extended method ret to wrap our lambda function that formats our numeric Fare column into a string.

It is also possible to create a more complex type, like an array of doubles:

from pyspark.sql.types import ArrayType, DoubleType

def make_list(Fare):
    return Fare.apply(lambda v: [v, v*2])

hdf_fenced = hdf_fenced.assign(fare_list=ArrayType(DoubleType()).ret(make_list))

hdf_fenced.cols['fare_list'][:5]
0           [7.25, 14.5]
1    [71.2833, 142.5666]
2         [7.925, 15.85]
3          [53.1, 106.2]
4           [8.05, 16.1]
Name: fare_list, dtype: object

OK, so, what happened here?

  1. First, we imported the necessary types, ArrayType and DoubleType, since we are building a function that returns a list of doubles.
  2. We actually built the function - notice that we call apply straight from Fare, which is treated as a pandas Series under the hood.
  3. We wrap the function with the return type ArrayType(DoubleType()) by invoking the extended method ret.
  4. Finally, we assign it to a new column name, and that's it!

Nicer exceptions

Now, suppose you make a mistake while creating your function... if you have used Spark for a while, you already realized that, when an exception is raised, it will be loooong, right?

To help you with that, HandySpark analyzes the error message and parses it nicely for you at the very top of the error message, in bold red:

exception

Safety first

HandySpark wants to protect your cluster and network, so it implements a safety whenever you perform an operation that are going to retrieve ALL data from your HandyFrame, like collect or toPandas.

How does that work? Every time a HandyFrame has one of these methods called, it will output up to the safety limit, which has a default of 1,000 elements.

safety on

Do you want to set a different safety limit for your HandyFrame?

safety limit

What if you want to retrieve everything nonetheless?! You can invoke the safety_off method prior to the actual method you want to call and you get a one-time unlimited result.

safety off

Don't feel like Handy anymore?

To get back your original Spark dataframe, you only need to call notHandy to make it not handy again:

hdf_fenced.notHandy()
DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string, logFare: double, is_c_or_q: boolean]

Comments, questions, suggestions, bugs

DISCLAIMER: this is a project under development, so it is likely you'll run into bugs/problems.

So, if you find any bugs/problems, please open an issue or submit a pull request.

handyspark's People

Contributors

dvgodoy avatar manuzhang avatar martinsotir avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

handyspark's Issues

koalas vs handyspark

it's awesome that you're adding plots (databricks/koalas#293) to koalas @dvgodoy ! I've been using handyspark for plotting with spark dataframes, will you be continuing the development of handyspark or would you recommend switching to koalas?

How to compute the median value of a column

Really nice work

How to compute the correct median value of a handyspark dataframe?
I tried to compute the median value of a column through pandas and I can the correct value, but when i compute the median value for the same column and same dataset through handyspark I get a different value. Any clue as to why this may happen?

Thanks!

need to have inline plots visuals on zeppelin ..

used the code as below
fig, axs = plt.subplots(1, 4, figsize=(16, 4))
hdf_filled.cols['Parch'].hist(ax=axs[0])
hdf_filled.cols['SibSp'].hist(ax=axs[1])
hdf_filled.cols['Age'].boxplot(ax=axs[2], k=3)
hdf_filled.cols['Fare'].boxplot(ax=axs[3], k=3)
plt.show()

the plots are shown on the popup windows rather than inline.
How to have plots inline on the zeppelin?

how to get get bins and counts instead of a plot

Getting this error when trying to get back bins and counts instead of a histogram plot...
module 'handyspark.plot' has no attribute 'stratified_histogram'

but other modules exist

help(handy.plot.histogram)
histogram(sdf, colname, bins=10, categorical=False, ax=None)

The doc pages lists this method in the plot module however ! Is there a recommended way of getting the bins and counts instead of a plot ?

remove pyspark dependency

I'm trying to use this package in a production environment, pyspark is provided as I'm submitting my application using spark submit. Handyspark however requires spark to be installed, which results in a conflict in Spark versions (my cluster runs Spark 2.3 and Handyspark pulls in Spark 2.4 since that's the latest stable).

I think this is a common scenario and it would be better if Handyspark doesn't depend on pyspark but simply assumes you'll use it in an environment where pyspark is available, either installed or added to that system path using findspark.

boxplot is failing

I am using PySpark version 3.01 on DataBricks 7.4.

I am getting this error when trying to do a boxplot (histograms work fine). I have tried manually casting DISTANCE as both as integer and a double, but both fail:

AnalysisException: cannot resolve 'approx_percentile(`DISTANCE`, CAST(0.25BD AS DOUBLE), 100.0BD)' due to data type mismatch: argument 3 requires integral type, however, '100.0BD' is of decimal(4,1) type.; line 1 pos 0;
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
<command-2120656041886569> in <module>
      5 hdf.cols["ORIGIN_AIRPORT"].hist(ax=axs[1,0])
      6 hdf.cols["DESTINATION_AIRPORT"].hist(ax=axs[1,1])
----> 7 hdf.cols["DISTANCE"].boxplot(ax=axs[2,0])
      8 hdf.cols["plannedDepartTime"].boxplot(ax=axs[2,1])



root
 |-- dayOfWeek: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: integer (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- DISTANCE: double (nullable = true)
 |-- SCHEDULED_TIME: integer (nullable = true)
 |-- plannedDepartTime: integer (nullable = true)
 |-- label: integer (nullable = true)

pyqt4 error

Hello

We have tried to install the library at our Linux server that has Anaconda3 repository. It errors out asking for PyQT4 to be installed first. Since PyQT5 now is the standard for Anaconda3 and Anaconda2, I wonder if there is a way to install it without having to downgrade PyQT?

Access to filter method in handyspark API

Thanks for the great work!! , not sure why every other such module converts to Pandas by default, defeats the purpose imo. I'm trying it out currently.

I have a question around filtering a dataframe rows in Handy: Is it currently possible to filter rows based on column values directly instead of creating a column output and assigning it back as a new column in the dataframe ? Would be great to have a direct filter capability in the API or any workaround that doesn't need the user to use low level spark calls for filtering.

When column is of type int, histogram acts different from pandas dataframe

When I summon a hist from a Pandas column (Series) containing integers I get a proper histogram where the x axis is divided to bins of value ranges.
When I do the same using a handy DataFrame I get a categorical histogram.

I dug into the code and the reason for the way handy acts is that the column of integers is not defined as a member of the self._continuous group of columns.

hist uses the continuous list as an indication of using categorical for non continuous. This is why a hist of integers in handy is not what one would expect from a hist of integers in Pandas.

a workaround is to cast the integer column to floats. I think this is a bug (couldn't find anything in the docs).

Here's a quick repro code..

pdf = pd.DataFrame({'bobo': np.random.randint(0, 100, 5000)})
df = spark.createDataFrame(pdf).withColumn('float_bobo', F.col('bobo').astype('float'))
hdf = df.toHandy()
pdf.bobo.hist()
hdf.cols['bobo'].hist()
hdf.cols['float_bobo'].hist()

I forgot to congratulate you on this great lib, it really is cool!

Itamar

getMetricsByThreshold is failing

This method is failing because of TypeError in eveluation.py line 142.
Screenshot 2023-05-19 at 21 37 05

I believe in lines 141 and 142,
select(scoreCol, labelCol).rdd.map((lambda row:(float(row [scoreCol][1]) , float(row[labelCol]))) should change to the following:
select(scoreCol, labelCol).rdd.map((lambda row:(float(row [scoreCol]) , float(row[labelCol])))

Boxplot X label rotation

Is there a way to rotate x labels in grouped/stratified boxplots? I tried 'rot' argument from pandas boxplot, I also tried to assign ax=axs so I can set the labels afterwards, but it's not working for stratified boxplot.
(BTW thank you for this great tool!!!)

this works(no rotation):
pattern_time_ranges.stratify(['event_type']).cols['event_time_range'].boxplot(figsize=(16, 5))

this does not work:
fig, axs = plt.subplots(1, 1, figsize=(16, 5))
pattern_time_ranges.stratify(['event_type']).cols['event_time_range'].boxplot(ax=axs)
Error : NotImplementedError: TransformNode instances can not be copied. Consider using frozen() instead.

this does not work too:
pattern_time_ranges.stratify(['event_type']).cols['event_time_range'].boxplot(figsize=(16, 5), rot=90)

SyntaxError: only named arguments may follow *expression when use python2

When I use python2, I'll got the error as follow:

$ pyspark
Python 2.7.5 (default, Aug  4 2017, 00:39:18)
// comment:some pyspark output are omitted
Using Python version 2.7.5 (default, Aug  4 2017 00:39:18)
SparkSession available as 'spark'.
>>> import handyspark
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/handyspark/__init__.py", line 1, in <module>
    from handyspark.extensions.evaluation import BinaryClassificationMetrics
  File "/usr/lib/python2.7/site-packages/handyspark/extensions/__init__.py", line 2, in <module>
    from handyspark.extensions.evaluation import BinaryClassificationMetrics
  File "/usr/lib/python2.7/site-packages/handyspark/extensions/evaluation.py", line 3, in <module>
    from handyspark.plot import roc_curve, pr_curve
  File "/usr/lib/python2.7/site-packages/handyspark/plot.py", line 53
    splits = np.linspace(*sdf.agg(F.min(col), F.max(col)).rdd.map(tuple).collect()[0], n + 1)
SyntaxError: only named arguments may follow *expression

Python modules' info:

$ pip list |grep -E "spark"
handyspark (0.2.1a1)
pyspark (2.4.0)

As PEP 3132 says:

Only allow a starred expression as the last item in the exprlist. This would simplify the unpacking code a bit and allow for the starred expression to be assigned an iterator. This behavior was rejected because it would be too surprising.

This error only appear in python2.

Request Python2.6.6 version

Can you make a python2.6.6 version?
I work with a Cloudera Cluster that uses python2 and cannot be upgraded.

Error: only named arguments may follow *expression

Hi

I installed handyspark on Google Dataproc cluster but I am unable to import it.

When I import it using from handyspark import * it gives me an error only named arguments may follow an * expression.

Can anyone please guide me on why this is happening.

Options for hist() plot

this is a really useful library @dvgodoy! I have a question related to the options available for the hist() plot. The command hdf.cols['Embarked'].hist(ax=axs[0]) does not accept a lot of the keywords that are usually available with the pandas hist() plot. For e.g., bins is accepted, but grid=True is not accepted and range is not accepted.

How do I find out what keyword arguments can be passed to handyspark dataframe plots? I'd greatly appreciate your feedback. Thanks,

Confusion martrix not working with DataBricks 7.4 ML level of spark

Using DataBricks v7.4 ML cluster, with Spark 3.01

(also I think you are using MultiClassMetrics to get the AUC value, but that seems to be high when one has a LR model. See:
https://stackoverflow.com/questions/60772315/how-to-evaluate-a-classifier-with-apache-spark-2-4-5-and-pyspark-python )

I get this error when trying to do the confusion matrix:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-3318334168327659> in <module>
----> 1 bcm.print_confusion_matrix(.572006)

/databricks/python/lib/python3.7/site-packages/handyspark/extensions/evaluation.py in print_confusion_matrix(self, threshold)
    111     confusionMatrix: pd.DataFrame
    112     """
--> 113     cm = self.confusionMatrix(threshold).toArray()
    114     df = pd.concat([pd.DataFrame(cm)], keys=['Actual'], names=[])
    115     df.columns = pd.MultiIndex.from_product([['Predicted'], df.columns])

/databricks/python/lib/python3.7/site-packages/handyspark/extensions/evaluation.py in confusionMatrix(self, threshold)
     92     """
     93     scoreAndLabels = self.call2('scoreAndLabels').map(lambda t: (float(t[0] > threshold), t[1]))
---> 94     mcm = MulticlassMetrics(scoreAndLabels)
     95     return mcm.confusionMatrix()
     96 

/databricks/spark/python/pyspark/mllib/evaluation.py in __init__(self, predictionAndLabels)
    254         sc = predictionAndLabels.ctx
    255         sql_ctx = SQLContext.getOrCreate(sc)
--> 256         numCol = len(predictionAndLabels.first())
    257         schema = StructType([
    258             StructField("prediction", DoubleType(), nullable=False),

/databricks/spark/python/pyspark/rdd.py in first(self)
   1491         ValueError: RDD is empty
   1492         """
-> 1493         rs = self.take(1)
   1494         if rs:
   1495             return rs[0]

/databricks/spark/python/pyspark/rdd.py in take(self, num)
   1473 
   1474             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1475             res = self.context.runJob(self, takeUpToNumLeft, p)
   1476 
   1477             items += res

/databricks/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1225             finally:
   1226                 os.remove(filename)
-> 1227         sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1228         return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
   1229 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2131.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2131.0 (TID 9926, ip-10-172-254-217.us-west-2.compute.internal, executor driver): java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:159)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:150)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:150)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:703)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:479)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2146)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:271)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2519)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2466)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2460)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2460)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1152)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1152)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1152)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2656)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2331)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2352)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2371)
	at org.apache.spark.api.python.PythonRDD$.collectPartitions(PythonRDD.scala:197)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:217)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:159)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:150)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:150)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:442)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:703)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:479)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2146)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:271)

I also get deprecation warnings when doing the ROC plots:

image

tukey outliers fails

I tried:

hdf.outliers(method='tukey', k=3.)

and I got this error with pySpark 3.01

HANDY EXCEPTION SUMMARY

Location: "<string>"
Line	: 3
Function: raise_from
Error	:    +- Relation[dayOfWeek#73,AIRLINE#74,FLIGHT_NUMBER#75,ORIGIN_AIRPORT#76,DESTINATION_AIRPORT#77,DISTANCE#78,SCHEDULED_TIME#79,plannedDepartTime#80,label#81] parquet
---------------------------------------------------------------------------
HandyException: cannot resolve 'approx_percentile(`FLIGHT_NUMBER`, CAST(0.25BD AS DOUBLE), 100.0BD)' due to data type mismatch: argument 3 requires integral type, however, '100.0BD' is of decimal(4,1) type.; line 1 pos 0;

might be related to #26

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.