Comments (9)
@SandervandenOord thanks for sharing this technique! I found it really valuable for a project I'm working on.
Some of my schema names needed to be quoted in the SQL statement (long story
schema_tablename = '{}.{}'.format(schema, table_name)
to this:
schema_tablename = '"{}"."{}"'.format(schema, table_name)
I also wanted to use ','
as my separator, but found my routine would error when a CSV row would have a cell whose value also contained a comma, even if the value was enclosed in quotation marks. It seems cursor.copy_from doesn't expose a way of handling quoted values.
Here's a minor revision that uses cursor.copy_expert instead:
def df_to_database(engine, df, schema, table_name, if_exists='replace', encoding='utf-8', index=False):
# Create Table
df[:0].to_sql(table_name, engine, if_exists=if_exists, index=index, schema=schema)
# Prepare data
output = StringIO()
df.to_csv(output, sep=',', header=False, encoding=encoding, index=index)
output.seek(0)
# Insert data
connection = engine.raw_connection()
cursor = connection.cursor()
schema_tablename = '"{}"."{}"'.format(schema, table_name)
cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)
connection.commit()
cursor.close()
Just sharing in case other people run into the same.
Thank you again for the great technique!
from odo.
I was not able to use odo
and I'm wondering if this project is still maintained. Maybe @mrocklin can tell us something about. Anyway I modified your solution to work in python 3 too.
from io import StringIO
def sendToPG(df, tableName, con):
output = StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.getvalue()
output.seek(0)
raw = con.raw_connection()
curs = raw.cursor()
# null values become ''
columns = df.columns
curs.copy_from(output, tableName, null="", columns=(columns))
curs.connection.commit()
curs.close()
from odo.
In case you are looking for an alternative, you might want to consider d6tstack. You can process CSV and then export to csv, parquet or SQL without having to write custom functions. You can load multiple files and it deals with data schema changes (added/removed columns). Chunked out of core support is already built in. It benchmarks well vs df.to_sql()
.
def apply(dfg):
# do stuff
return dfg
c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',')
# or
c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply)
# output to various formats, automatically chunked to reduce memory consumption
c.to_csv_combine(filename='out.csv')
c.to_parquet_combine(filename='out.pq')
c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # slow but flexible
from odo.
@SandervandenOord Thank you for this amazing technique! I had to push over 4 GB of data(mostly text fields - conversations) to Postgres and this made my task much simpler. It took 30 minutes (actual wall time - 30min 35s) to run.
I modified @wolfc86 code to add chunksize option in df.to_csv(...).
Then I split the dataframe into chunks, created a header outside the method, and pushed the data into the db in small batches(instead of bulk insert). Took 8 minutes to execute.
Revised snippet:
def df_to_database(engine, df, schema, table_name, if_exists='append', encoding='utf-8', index=False):
# Prepare data
output = StringIO()
df.to_csv(output, sep=',', header=False, encoding=encoding, index=index, chunksize = 5000)
output.seek(0)
# Insert data
connection = engine.raw_connection()
cursor = connection.cursor()
schema_tablename = '"{}"."{}"'.format(schema, table_name)
cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)
connection.commit()
cursor.close()
# Add header
df[:0].to_sql(table_name, engine, schema , index=False, if_exists= 'replace')
# Split the dataframe into a list of dataframes
n = int(round(1/10 * df.shape[0], -2))
split_df = [df[i:i+n] for i in range(0, df.shape[0], n)]
# Call df_to_database() on chunks
for chunk in range(len(split_df)):
df_to_database(engine, split_df[chunk], schema, table_name)
Thanks again!
from odo.
I have a pandas dataframe of 8 million rows. I found a fast method on stackoverflow to write this data to Postgres: takes less than 2 minutes.
Today I found odo and was hoping it would be faster, but it wasn't.
In fact my guess is it would take 30 minutes or so with odo, I stopped after 7 minutes.Here's the code for faster writing of dataframe to Postgres, hope it helps you guys:
from cStringIO import StringIO def df_to_database(engine, df, schema, table_name, if_exists='replace', sep='\x01', encoding='utf-8'): # Create Table df[:0].to_sql(table_name, engine, if_exists=if_exists, index=False, schema=schema) # Prepare data output = StringIO() df.to_csv(output, sep=sep, header=False, encoding=encoding, index=False) output.seek(0) # Insert data connection = engine.raw_connection() cursor = connection.cursor() schema_tablename = '{}.{}'.format(schema, table_name) cursor.copy_from(output, schema_tablename, sep=sep, null='') connection.commit() cursor.close()
Thanks!!!
from odo.
Hi could you help me to understand what you will add in schema ?
from odo.
@wolfc86 You forgot to add index
as a function parameter. The first line of your code should be:
def df_to_database(engine, df, schema, table_name, if_exists='replace', encoding='utf-8', index=False):
[...]
from odo.
Ah, right you are @mairanteodoro! I updated the snippet in my comment. Thanks!
from odo.
@SandervandenOord Thank you for this amazing technique! I had to push over 4 GB of data(mostly text fields - conversations) to Postgres and this made my task much simpler. It took 30 minutes (actual wall time - 30min 35s) to run.
I modified @wolfc86 code to add chunksize option in df.to_csv(...).
Then I split the dataframe into chunks, created a header outside the method, and pushed the data into the db in small batches(instead of bulk insert). Took 8 minutes to execute.
Revised snippet:
def df_to_database(engine, df, schema, table_name, if_exists='append', encoding='utf-8', index=False): # Prepare data output = StringIO() df.to_csv(output, sep=',', header=False, encoding=encoding, index=index, chunksize = 5000) output.seek(0) # Insert data connection = engine.raw_connection() cursor = connection.cursor() schema_tablename = '"{}"."{}"'.format(schema, table_name) cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output) connection.commit() cursor.close() # Add header df[:0].to_sql(table_name, engine, schema , index=False, if_exists= 'replace') # Split the dataframe into a list of dataframes n = int(round(1/10 * df.shape[0], -2)) split_df = [df[i:i+n] for i in range(0, df.shape[0], n)] # Call df_to_database() on chunks for chunk in range(len(split_df)): df_to_database(engine, split_df[chunk], schema, table_name)
Thanks again!
Works like a charm. Thanks
from odo.
Related Issues (20)
- Pandas DataFrame to MySQL/SQLAlchemy table fail with nans
- postgress uuid not working
- Saving CSV object to file
- AttributeError: 'DiGraph' object has no attribute 'edge' - when trying to use odo HOT 3
- Can odo do an UPDATE into a database table? Or just an INSERT? HOT 1
- Unnecessary mandatory dependencies HOT 1
- Is anyone maintaining this repo? If not, has everyone moved to a newer better alternative? HOT 5
- Error while inserting data to database from csv in remote MySQL
- incompatible with latest pandas HOT 10
- Documentation provides invalid SSL certificate
- How to specify queue when import data to Hive
- Deprecation warning due to invalid escape sequences
- Importing ABC directly from collections was deprecated and will be removed in Python 3.10. Use collections.abc HOT 1
- Opening in U mode has been deprecated and will be removed in Python 3.10
- BUG:TypeError: Cannot interpret 'CategoricalDtype(categories=['no', 'yes'], ordered=False)' as a data type
- Using "filter_kwargs" on "create_engine" never returns matches
- inspect.getargspec() is deprecated since Python 3.0, use inspect.signature() or inspect.getfullargspec
- Odo 0.5.1 is incompatible with sqlalchemy 1.4 on Arch Linux HOT 1
- RecursionError: maximum recursion depth exceeded
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from odo.