akhtarshahnawaz / multiprocesspandas Goto Github PK
View Code? Open in Web Editor NEWAdds multiprocessing capabilities to Pandas to parallelize apply operations on DataFrames, Series and DataFrameGroupBy
License: MIT License
Adds multiprocessing capabilities to Pandas to parallelize apply operations on DataFrames, Series and DataFrameGroupBy
License: MIT License
It would be nice if there was the ability to aggregate in parallel as well.
For example, here's something that parallelizes a SeriesGroupBy:
from multiprocess import Pool
import functools
from os import cpu_count
from tqdm import tqdm
import numpy as np
def attachpandas():
pd.core.groupby.SeriesGroupBy.agg_parallel = agg_parallel
def agg_parallel(self, func, num_processes=cpu_count(), n_chunks=None, *args, **kwargs):
"""
Add functionality to pandas so that you can do processing on groups on multiple cores at same time.
- This method will pass each group dataframe to the passed func (including key columns on which the group is formed).
"""
if n_chunks is not None:
assert n_chunks >= num_processes, "Number of chunks must be greater than or equal to the number of processes."
func = functools.partial(func, *args, **kwargs)
chunk_size = len(self) // (n_chunks if n_chunks is not None else num_processes)
with Pool(num_processes) as p:
ret_list = p.map(func, tqdm([df for _, df in self]), chunksize=max(1, chunk_size))
if isinstance(ret_list[0], pd.DataFrame):
return pd.concat(ret_list, axis=0)
if isinstance(ret_list[0], pd.Series):
return pd.concat(ret_list, axis=1).T
return pd.DataFrame(ret_list, index=self.indices)
pd.core.groupby.SeriesGroupBy.agg_parallel = agg_parallel
I have the following function and code to call it. The same code works with apply
, but failed with apply_parallel
.
from rapidfuzz.process import extractOne
def fuzzy_match(text, keys, threshold=86, scorer=WRatio):
if text is not None:
key, score, _ = extractOne(text, keys, scorer=scorer)
if score >= threshold:
return key, int(score)
return '', 0
temp = df["payee_name"].apply_parallel(fuzzy_match, keys=remit_names)
Below is the traceback. What's wrong with it and what change is required in order to use apply_parallel
?
Traceback (most recent call last):
File "c:\Users\xzhang\Documents\VSCode\Python\Payees\Code\name_matching_fuzzy_multi_para_pandas.py", line 78, in <module>
main(replacer1, replacer2, exclusions, keep_one, stop_words)
File "C:\Users\xzhang\Documents\VSCode\Python\Settings\xz_settings_main.py", line 114, in __call__
result = self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\xzhang\Documents\VSCode\Python\Payees\Code\name_matching_fuzzy_multi_para_pandas.py", line 75, in main
processing_df(df_remit_master, remit_names, replacer1, replacer2, exclusions, keep_one)
File "C:\Users\xzhang\Documents\VSCode\Python\Settings\xz_settings_main.py", line 114, in __call__
result = self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "c:\Users\xzhang\Documents\VSCode\Python\Payees\Code\name_matching_fuzzy_multi_para_pandas.py", line 55, in processing_df
temp = df["payee_name"].apply_parallel(fuzzy_match, keys=remit_names)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\xzhang\AppData\Roaming\Python\Python311\site-packages\multiprocesspandas\applyparallel.py", line 70, in series_apply_parallel
return pd.Series(ret_list, index=self.index)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\xzhang\AppData\Roaming\Python\Python311\site-packages\pandas\core\series.py", line 503, in __init__
com.require_length_match(data, index)
File "C:\Users\xzhang\AppData\Roaming\Python\Python311\site-packages\pandas\core\common.py", line 561, in require_length_match
raise ValueError(
ValueError: Length of values (200) does not match length of index (100)
it would be usefull to have a switch to disable and/or enable the progressbar, i think!
Nice work!
When I tried to use your code, I found that it ended up taking more time than normal. With plain pandas, it took 90 sec. With this library, it took 135 seconds. I think it is because of df.copy()
in this line making copies of the data unnecessarily.
This has assumed an integer value to be given for n_chunks. And will throw a Type Error. Redefine n_chunks before throwing an assertion error.
This is a great effort. Thanks!
I agree that apply features are among the most memory intensive ones. I believe multi-processing for read_csv()
in the first place is very essential.
All the best!
Hello and excellent work with this one! How would utilize this while creating and populating a new column in a pandas dataframe? Thanks!
It works perfect and very awesome on jupyternotebook....unfortunately I am having issue using it on a plotly dash application. I keep getting this error that my function func() got an unexpected keyword argument 'axis
Here is my function and how I am using it:
def func(x):
import requests
return requests.get(x['URL'], allow_redirects = True, timeout=10).status_code
df['Status Code'] = df['URL'].apply_parallel(func, axis=0)
Pls kindly assist with this. Thank you
Hi and thanks for this package. I faced a problem while using it. I have a function that takes two values to calculate the third value. Surprisingly when I use regular apply function, it takes less time in comparison with using apply_parallel(). Is there anything that I did wrong?
This is my code:
import pandas as pd
from multiprocesspandas import applyparallel
def calculate_new_features(value_1, value_2):
new_value = ((value_1 ** 2) + (value_2 ** 2)) ** 0.5
return new_value
def calculate_new_feature_row_based(row):
new_feature = ((row['A'] ** 2) + (row['B'] ** 2)) ** 0.5
return new_feature
df = pd.DataFrame(np.random.randint(0, 1000 ,size=(10000, 4)), columns=list('ABCD'))
# Without apply_parallel()
start_time = time.time()
df['F'] = df.apply(lambda row:calculate_new_features(row['A'], row['B']), axis=1)
running_time = time.time() - start_time
print(running_time)
# 0.3593626022338867 seconds
# Using apply_parallel()
start_time = time.time()
df['F'] = df.apply_parallel(calculate_new_feature_row_based, num_processes=4)
running_time = time.time() - start_time
print(running_time)
# 4.846656799316406 seconds
Hi @akhtarshahnawaz , nice work!
Would you be interested in applying a progressbar to you library?
Hi,
When I use your module I receive a future warning message
multiprocesspandas\applyparallel.py:29: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning.
ret_list = p.map(func, [df.copy() for idx, df in self])
multiprocesspandas\applyparallel.py:34: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning.
out = pd.DataFrame([idx for idx, df in self], columns=self.keys)
This is the line of code:
data2 = data.groupby(by=['id_cluster'], group_keys=False).apply_parallel(diam_measure, , min_h=min_h)
I think the reason is I used one column for the grouping
Do you have an idea how to solve this?
PS: this is only to have a longterm code, every thing works wells
Hey
I've been looking at your library and its implementation of multiprocessing to speed up some of my workflows, but I can't figure out the usecase for attachpandas function.
multiprocesspandas/multiprocesspandas/applyparallel.py
Lines 19 to 22 in b936010
I do understand that the implementation uses a monkey-patch way of adding features and not the new API provided by pandas itself, but I can't figure out why that function exists. It's not used either if I am not mistaken. These lines are more than enough if I look at the old documentation
BTW: likely best to one day move to the official API no?
https://pandas.pydata.org/pandas-docs/version/0.15/faq.html#adding-features-to-your-pandas-installation
https://stackoverflow.com/a/58705945/20716078
Thanks
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.