Comments (6)
Multiprocessing is most useful for single-threaded python code. (I.e. GIL-restricted python code).
Polars is natively multithreaded, so a single process could (in theory) use all your compute.
Suppose you can structure your code in the form def f(df: LazyFrame) -> LazyFrame: ...
where a single function f
acting on LazyFrames does all the compute.
Then you could create the list of lazy queries [f(df) for df in input_frames]
and run pl.collect_all
on the list of queries to run them all in parallel without needing multiprocessing.
from polars.
Interesting, so you would suggest something like
pl.collect_all([f(df) for df in pl.scan_parquet(file_names)])
?
from polars.
Thanks, I think I got it to work. Outstanding!
from polars.
So I ran some more tests, something like
POLARS_MAX_THREADS=20 python
Python 3.12.2 (main, Mar 12 2024, 11:02:14) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>> from lib.aggregation.aggregator import AggregatorTransformer
>>> from lib.common import paths
>>> from datetime import datetime
>>> record = pickle.load(open(paths.full_record_path, 'rb'))
>>> aggregator = AggregatorTransformer(events_path=paths.events_path, chunk_size=10000, use_chunking=False)
>>> start_time = datetime.now()
>>> record = aggregator.transform(record)
>>> end_time = datetime.now()
>>> print(end_time-start_time)
0:17:14.741605
[...]
root@1715047166c4:/usr/src/app/pipeline/v0# POLARS_MAX_THREADS=30 python
Python 3.12.2 (main, Mar 12 2024, 11:02:14) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>> from lib.aggregation.aggregator import AggregatorTransformer
>>> from lib.common import paths
>>> from datetime import datetime
>>> record = pickle.load(open(paths.full_record_path, 'rb'))
>>> start_time = datetime.now()
>>> record = aggregator.transform(record)
>>> end_time = datetime.now()
>>> print(end_time-start_time)
0:20:07.030533
[...]
root@1715047166c4:/usr/src/app/pipeline/v0# POLARS_MAX_THREADS=10 python
Python 3.12.2 (main, Mar 12 2024, 11:02:14) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pickle
>>> from lib.aggregation.aggregator import AggregatorTransformer
>>> from lib.common import paths
>>> from datetime import datetime
>>> record = pickle.load(open(paths.full_record_path, 'rb'))
>>> aggregator = AggregatorTransformer(events_path=paths.events_path, chunk_size=10000, use_chunking=False)
>>> start_time = datetime.now()
>>> record = aggregator.transform(record)
>>> end_time = datetime.now()
>>> print(end_time-start_time)
0:22:10.055686
The aggregator does under the hood iterate over several of these afore mentioned workloads
aggregated= self.eval_aggregate(agg_functionl, files=file_names)
with
@staticmethod
def eval_aggregate(func, files, **kwargs):
results_lazy = [func([file], **kwargs) for file in files]
results = [res for res in pl.collect_all(results_lazy)]
full_res = pl.concat(results).to_pandas()
return full_res
The functions which we apply this to are all of the form
def func(file_names):
intermediate_step = (pl.scan_parquet(file_names))
[...]
return complicated_result
What matters is that the compute is distributed. The majority of the time is spent on the
results = [res for res in pl.collect_all(results_lazy)]
step, so it is not the to_pandas and any later index setting on the pandas frame, which might scale differently.
You can see from the outputs, the time to run seems to be fairly independent of the number of threads assigned to the session. It also seems to wobble a bit, where I got at 20 Threads trial with a time of 17.14 minutes, which seems is lower than the speed at 30 threads 20.03 minutes, but a second try "only" got 20.12 seconds.
Even if we take the 17.14 minute as outlier, shouldn't such an embarassingly parallel problem scale stronger with the number of threads assigned?
from polars.
It's very hard to know what the bottleneck is with your specific case.
It may or may not be something in your code.
You could be reading parquet files from a slow source, for example.
(I'm assuming you don't have anything really slow like map_elements
in your code.)
If amazon says you have 32 vCPUs, it probably means you have 16 hyperthreaded cores.
So comparing 20 to 30 threads, I don't know if I would even expect a strong difference in the best case.
If you really want to optimize the number of threads for a particular AWS instance, you could probably make a "trivial" parallel query that doesn't involve scan_parquet
. For example, make 1000 random float columns and compute the median of them all, or something.
from polars.
I see. Strangely enough, when I change the underlying machine type to a smaller machine, I do get a change in speed. Could there be something with the way with which the threadpol_size actually gets updated that is shows a change but maybe nothing different under the hood?
Either way I'll close the ticket, it was quite helpful to change to the collect_all()
from polars.
Related Issues (20)
- Implement alternate version of `median` that preserves data type HOT 6
- Cannot chain over() and list.set_intersection() in polars-1.0.0beta1 (works in 0.20.31) HOT 1
- Projection pushdown not working for AnonymousScan when filtering on calculated column HOT 4
- `.implode` + `.over` + `.list.set_intersection` PanicException left == right failed. HOT 1
- polars-lazy fails to compile with `super::get_glob_start_idx` error HOT 4
- Unable to build project with 0.41.0 or 0.41.1: error[E0277] with group_join_inner in polars-ops HOT 14
- Unable to build the crate with the lazy feature! V.0.41.1 HOT 2
- Issue when collecting df
- Mention required feature flags for plotting / convert to pandas without PyArrow if possible HOT 2
- aho-corasick `.str.extract_many()`
- Broadcast operations similar to Pandas / Numpy
- pl.scan_pyarrow_dataset drops timezone information
- Hive partitions are corrupted during reads from cloud storage in Polars 1.0.0-rc.1
- Rust 0.41.1: error[E0412]: cannot find type `CloudOptions` in this scope HOT 4
- Support writing to multiple files in a directory with `write/sink_parquet` HOT 1
- `DataFrame.top_k` not handling nulls correctly in version 1.0.0-rc.1 HOT 3
- String to datetime conversion with custom format HOT 5
- Add `get()` Method for Safe Column Access in DataFrame HOT 1
- Segmentation Fault when plotting with plotly HOT 3
- sink_parquet_cloud doesnt work when updating from 0.40 -> 0.41 HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from polars.