Giter Site home page Giter Site logo

multiprocesspandas's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

multiprocesspandas's Issues

add agg_parallel

It would be nice if there was the ability to aggregate in parallel as well.

For example, here's something that parallelizes a SeriesGroupBy:

from multiprocess import Pool
import functools
from os import cpu_count
from tqdm import tqdm
import numpy as np

def attachpandas():
    pd.core.groupby.SeriesGroupBy.agg_parallel = agg_parallel

def agg_parallel(self, func, num_processes=cpu_count(), n_chunks=None, *args, **kwargs):
    """
    Add functionality to pandas so that you can do processing on groups on multiple cores at same time.
    - This method will pass each group dataframe to the passed func (including key columns on which the group is formed).
    """
    if n_chunks is not None:
        assert n_chunks >= num_processes, "Number of chunks must be greater than or equal to the number of processes."
    func = functools.partial(func, *args, **kwargs)

    chunk_size = len(self) // (n_chunks if n_chunks is not None else num_processes)

    with Pool(num_processes) as p:
        ret_list = p.map(func, tqdm([df for _, df in self]), chunksize=max(1, chunk_size))

    if isinstance(ret_list[0], pd.DataFrame):
        return pd.concat(ret_list, axis=0)
    
    if isinstance(ret_list[0], pd.Series):
        return pd.concat(ret_list, axis=1).T
    
    return pd.DataFrame(ret_list, index=self.indices)

pd.core.groupby.SeriesGroupBy.agg_parallel = agg_parallel

How to use apply_parallel correctly?

I have the following function and code to call it. The same code works with apply, but failed with apply_parallel.

from rapidfuzz.process import extractOne
def fuzzy_match(text, keys, threshold=86, scorer=WRatio):
    if text is not None:
        key, score, _ = extractOne(text, keys, scorer=scorer)
        if score >= threshold:
            return key, int(score)
    return '', 0

temp = df["payee_name"].apply_parallel(fuzzy_match, keys=remit_names)

Below is the traceback. What's wrong with it and what change is required in order to use apply_parallel?

Traceback (most recent call last):
  File "c:\Users\xzhang\Documents\VSCode\Python\Payees\Code\name_matching_fuzzy_multi_para_pandas.py", line 78, in <module>
    main(replacer1, replacer2, exclusions, keep_one, stop_words)
  File "C:\Users\xzhang\Documents\VSCode\Python\Settings\xz_settings_main.py", line 114, in __call__
    result = self.func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\xzhang\Documents\VSCode\Python\Payees\Code\name_matching_fuzzy_multi_para_pandas.py", line 75, in main
    processing_df(df_remit_master, remit_names, replacer1, replacer2, exclusions, keep_one)
  File "C:\Users\xzhang\Documents\VSCode\Python\Settings\xz_settings_main.py", line 114, in __call__
    result = self.func(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\Users\xzhang\Documents\VSCode\Python\Payees\Code\name_matching_fuzzy_multi_para_pandas.py", line 55, in processing_df
    temp = df["payee_name"].apply_parallel(fuzzy_match, keys=remit_names)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\xzhang\AppData\Roaming\Python\Python311\site-packages\multiprocesspandas\applyparallel.py", line 70, in series_apply_parallel
    return pd.Series(ret_list, index=self.index)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\xzhang\AppData\Roaming\Python\Python311\site-packages\pandas\core\series.py", line 503, in __init__
    com.require_length_match(data, index)
  File "C:\Users\xzhang\AppData\Roaming\Python\Python311\site-packages\pandas\core\common.py", line 561, in require_length_match
    raise ValueError(
ValueError: Length of values (200) does not match length of index (100)

Why df.copy() when spawning processes?

Nice work!
When I tried to use your code, I found that it ended up taking more time than normal. With plain pandas, it took 90 sec. With this library, it took 135 seconds. I think it is because of df.copy() in this line making copies of the data unnecessarily.

Feature Request: Multi-processing for read_csv()

This is a great effort. Thanks!

I agree that apply features are among the most memory intensive ones. I believe multi-processing for read_csv() in the first place is very essential.

All the best!

Usage with plotly dash application

It works perfect and very awesome on jupyternotebook....unfortunately I am having issue using it on a plotly dash application. I keep getting this error that my function func() got an unexpected keyword argument 'axis
Here is my function and how I am using it:
def func(x):
import requests
return requests.get(x['URL'], allow_redirects = True, timeout=10).status_code

df['Status Code'] = df['URL'].apply_parallel(func, axis=0)

Pls kindly assist with this. Thank you

It takes more time to apply a function while using apply_parallel()

Hi and thanks for this package. I faced a problem while using it. I have a function that takes two values to calculate the third value. Surprisingly when I use regular apply function, it takes less time in comparison with using apply_parallel(). Is there anything that I did wrong?
This is my code:

import pandas as pd
from multiprocesspandas import applyparallel

def calculate_new_features(value_1, value_2):
    new_value = ((value_1 ** 2) + (value_2 ** 2)) ** 0.5
    return new_value

def calculate_new_feature_row_based(row):
    new_feature = ((row['A'] ** 2) + (row['B'] ** 2)) ** 0.5
    return new_feature

df = pd.DataFrame(np.random.randint(0, 1000 ,size=(10000, 4)), columns=list('ABCD'))

# Without apply_parallel()
start_time = time.time()
df['F'] = df.apply(lambda row:calculate_new_features(row['A'], row['B']), axis=1)
running_time = time.time() - start_time
print(running_time)
# 0.3593626022338867 seconds

# Using apply_parallel()
start_time = time.time()
df['F'] = df.apply_parallel(calculate_new_feature_row_based, num_processes=4)
running_time = time.time() - start_time
print(running_time)
# 4.846656799316406 seconds

FutureWarning: In a future version of pandas, a length 1 tuple will...

Hi,
When I use your module I receive a future warning message
multiprocesspandas\applyparallel.py:29: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning.
ret_list = p.map(func, [df.copy() for idx, df in self])
multiprocesspandas\applyparallel.py:34: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning.
out = pd.DataFrame([idx for idx, df in self], columns=self.keys)

This is the line of code:
data2 = data.groupby(by=['id_cluster'], group_keys=False).apply_parallel(diam_measure, , min_h=min_h)

I think the reason is I used one column for the grouping

Do you have an idea how to solve this?

PS: this is only to have a longterm code, every thing works wells

[Question] Why attachpandas()?

Hey

I've been looking at your library and its implementation of multiprocessing to speed up some of my workflows, but I can't figure out the usecase for attachpandas function.

def attachpandas():
pd.core.groupby.generic.DataFrameGroupBy.apply_parallel = group_apply_parallel
pd.core.series.Series.apply_parallel = series_apply_parallel
pd.core.frame.DataFrame.apply_parallel = df_apply_parallel

I do understand that the implementation uses a monkey-patch way of adding features and not the new API provided by pandas itself, but I can't figure out why that function exists. It's not used either if I am not mistaken. These lines are more than enough if I look at the old documentation

pd.core.groupby.generic.DataFrameGroupBy.apply_parallel = group_apply_parallel

pd.core.series.Series.apply_parallel = series_apply_parallel

pd.core.frame.DataFrame.apply_parallel = df_apply_parallel

BTW: likely best to one day move to the official API no?
https://pandas.pydata.org/pandas-docs/version/0.15/faq.html#adding-features-to-your-pandas-installation
https://stackoverflow.com/a/58705945/20716078

Thanks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.