Giter Site home page Giter Site logo

Comments (6)

s-banach avatar s-banach commented on July 23, 2024

Multiprocessing is most useful for single-threaded python code. (I.e. GIL-restricted python code).
Polars is natively multithreaded, so a single process could (in theory) use all your compute.

Suppose you can structure your code in the form def f(df: LazyFrame) -> LazyFrame: ...
where a single function f acting on LazyFrames does all the compute.
Then you could create the list of lazy queries [f(df) for df in input_frames]
and run pl.collect_all on the list of queries to run them all in parallel without needing multiprocessing.

from polars.

HCelion avatar HCelion commented on July 23, 2024

Interesting, so you would suggest something like

pl.collect_all([f(df) for df in pl.scan_parquet(file_names)])

?

from polars.

HCelion avatar HCelion commented on July 23, 2024

Thanks, I think I got it to work. Outstanding!

from polars.

HCelion avatar HCelion commented on July 23, 2024

So I ran some more tests, something like

 POLARS_MAX_THREADS=20 python
Python 3.12.2 (main, Mar 12 2024, 11:02:14) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>> from lib.aggregation.aggregator import AggregatorTransformer
>>> from lib.common import paths
>>> from datetime import datetime
>>> record = pickle.load(open(paths.full_record_path, 'rb'))
>>> aggregator = AggregatorTransformer(events_path=paths.events_path, chunk_size=10000, use_chunking=False)
>>> start_time = datetime.now()
>>> record = aggregator.transform(record)

>>> end_time =  datetime.now()
>>> print(end_time-start_time)
0:17:14.741605

[...]
root@1715047166c4:/usr/src/app/pipeline/v0# POLARS_MAX_THREADS=30 python
Python 3.12.2 (main, Mar 12 2024, 11:02:14) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>> from lib.aggregation.aggregator import AggregatorTransformer
>>> from lib.common import paths
>>> from datetime import datetime
>>> record = pickle.load(open(paths.full_record_path, 'rb'))
>>> start_time = datetime.now()
>>> record = aggregator.transform(record)

>>> end_time =  datetime.now()
>>> print(end_time-start_time)
0:20:07.030533
[...]
root@1715047166c4:/usr/src/app/pipeline/v0# POLARS_MAX_THREADS=10 python
Python 3.12.2 (main, Mar 12 2024, 11:02:14) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>> from lib.aggregation.aggregator import AggregatorTransformer
>>> from lib.common import paths
>>> from datetime import datetime
>>> record = pickle.load(open(paths.full_record_path, 'rb'))
>>> aggregator = AggregatorTransformer(events_path=paths.events_path, chunk_size=10000, use_chunking=False)
>>> start_time = datetime.now()
>>> record = aggregator.transform(record)

	>>> end_time =  datetime.now()
>>> print(end_time-start_time)
0:22:10.055686

The aggregator does under the hood iterate over several of these afore mentioned workloads

aggregated= self.eval_aggregate(agg_functionl, files=file_names)

with

    @staticmethod
    def eval_aggregate(func, files, **kwargs):
        results_lazy = [func([file], **kwargs) for file in files]
        results = [res for res in pl.collect_all(results_lazy)]
        full_res = pl.concat(results).to_pandas()
        return full_res

The functions which we apply this to are all of the form

def func(file_names):
   intermediate_step = (pl.scan_parquet(file_names))
  [...]
  return complicated_result

What matters is that the compute is distributed. The majority of the time is spent on the

results = [res for res in pl.collect_all(results_lazy)]

step, so it is not the to_pandas and any later index setting on the pandas frame, which might scale differently.

You can see from the outputs, the time to run seems to be fairly independent of the number of threads assigned to the session. It also seems to wobble a bit, where I got at 20 Threads trial with a time of 17.14 minutes, which seems is lower than the speed at 30 threads 20.03 minutes, but a second try "only" got 20.12 seconds.
Even if we take the 17.14 minute as outlier, shouldn't such an embarassingly parallel problem scale stronger with the number of threads assigned?

from polars.

s-banach avatar s-banach commented on July 23, 2024

It's very hard to know what the bottleneck is with your specific case.
It may or may not be something in your code.
You could be reading parquet files from a slow source, for example.
(I'm assuming you don't have anything really slow like map_elements in your code.)

If amazon says you have 32 vCPUs, it probably means you have 16 hyperthreaded cores.
So comparing 20 to 30 threads, I don't know if I would even expect a strong difference in the best case.

If you really want to optimize the number of threads for a particular AWS instance, you could probably make a "trivial" parallel query that doesn't involve scan_parquet. For example, make 1000 random float columns and compute the median of them all, or something.

from polars.

HCelion avatar HCelion commented on July 23, 2024

I see. Strangely enough, when I change the underlying machine type to a smaller machine, I do get a change in speed. Could there be something with the way with which the threadpol_size actually gets updated that is shows a change but maybe nothing different under the hood?
Either way I'll close the ticket, it was quite helpful to change to the collect_all()

from polars.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.