Comments (3)
this is the version that works with ThreadPoolExecutor, but seems to block:
Here is the version that uses Thread Pooling
def read_sql(self, sql, params = None):
t1 = timeit.default_timer()
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
with psycopg.connect(self.connect_str, autocommit=True) as conn:
df = pd.read_sql(sql, con = conn, params = params)
self.log.debug(f'perf: {timeit.default_timer() - t1}')
return df
the concurrent futures code is this:
import concurrent.futures as cf
def test_thread_pool():
db_reader = DataReader()
sql = "select id, name from factor f where f.id = ANY(%s)"
threads = 20
id_partitions = np.array_split(list(range(1, 10000)), threads)
id_partitions = [[p.tolist()] for p in id_partitions]
with cf.ThreadPoolExecutor(max_workers=threads) as exec:
futures = {
exec.submit(db_reader.read_sql, sql, params=p):
p for p in id_partitions
}
for future in cf.as_completed(futures):
ids = futures[future]
try:
df = future.result()
except Exception as exc:
log.exception(f'error retrieving data for: {ids}')
else:
if df is not None:
print(f'shape: {df.shape}')
The output of the debug line from read_sql looks like this:
perf: 0.7313497869981802
perf: 0.8116309550023288
perf: 3.401154975006648
perf: 5.22201336100261
perf: 6.325166654998611
perf: 6.338692951001576
perf: 6.573095380997984
perf: 6.5976604809984565
perf: 6.8282670119951945
perf: 7.291718505999597
perf: 7.4276196580030955
perf: 7.407097272000101
perf: 8.38801568299823
perf: 9.119963648998237
You'll notice that it is incrementing - id have expected it to be all roughly around the same time - so it seems there is some sql blocking. Also, the time gap between the first two threads and 3rd is always about 2-3 seconds - why is that?
I've also tried creating a new DbReader instance for each thread..but same effect.
anyone know if the connection or pandas read_sql blocks? or how to solve?
from pandas.
I have not had enough time to look at this, but from the issue title, shouldn't you await
this to remove the warning:
tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}
Because db_reader.read_sql_async
is an async method.
from pandas.
I have not had enough time to look at this, but from the issue title, shouldn't you
await
this to remove the warning:tasks = {db_reader.read_sql_async(sql, params=p) for p in id_partitions}Because
db_reader.read_sql_async
is an async method.
I am. You can see the await in the for/as_completed loop.
I think the problem is prob an async connection or cusor doesn't work with pandas. If so, if like to understand if pandas blocks ..as per the 2nd example, why am I not able to achieve better concurrency
from pandas.
Related Issues (20)
- BUG: Parameter converters when using the read function. HOT 2
- BUG: read_parquet wrongly returns empty index if asked to read empty column list HOT 2
- ENH/BUG: pd.date_range() still defaults to nanosecond resolution HOT 1
- ENH: New Name for "numpy_nullable" dtype_backend HOT 1
- BUG: `DatetimeIndex.union` gives wrong result with "datetime64[us]"
- BUILD: Pandas 1.2.5 build no longer works HOT 13
- ENH: .isin() method should use __contains__ rather than __iter__ for user-defined classes to determine presence. HOT 6
- BUG: `pd.read_excel` gives uninformative error for protected files HOT 2
- BUG: eval fails to process expression when one column name starts with a digit or some special characters HOT 1
- BUG: pandas dataframe column definition or mapping does not cater for upper case values. HOT 1
- BUG: HOT 2
- BUG: Limit param of fillna method does not work for pd.Int64Dtype() HOT 2
- BUG: Failed to import pandas <2.1.0 witn numpy >=2.0.0 HOT 4
- BUG: `Series.clip` does not work with scalar numpy arrays. HOT 4
- BUG: `DataFrame.to_numpy()` unnecessarily upcasts to `object` dtype. HOT 1
- ENH: Python 3.13 free-threading support HOT 3
- BUG: Pandas does not validate some parameters properly when reading CSVs and it causes segmentation faults
- BUG:
- BUG: `DataFrame.eval` fails with TypeError with multiline expr but works when `eval` line by line
- BUG: `DataFrame.sparse.from_spmatrix` hard codes an invalid ``fill_value`` for certain subtypes
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pandas.