Giter Site home page Giter Site logo

Comments (9)

wolfc86 avatar wolfc86 commented on August 21, 2024 7

@SandervandenOord thanks for sharing this technique! I found it really valuable for a project I'm working on.

Some of my schema names needed to be quoted in the SQL statement (long story 🙄) so I had to change this line:

schema_tablename = '{}.{}'.format(schema, table_name)

to this:

schema_tablename = '"{}"."{}"'.format(schema, table_name)

I also wanted to use ',' as my separator, but found my routine would error when a CSV row would have a cell whose value also contained a comma, even if the value was enclosed in quotation marks. It seems cursor.copy_from doesn't expose a way of handling quoted values.

Here's a minor revision that uses cursor.copy_expert instead:

def df_to_database(engine, df, schema, table_name, if_exists='replace', encoding='utf-8', index=False):
    # Create Table
    df[:0].to_sql(table_name, engine, if_exists=if_exists, index=index, schema=schema)

    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=',', header=False, encoding=encoding, index=index)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()

    schema_tablename = '"{}"."{}"'.format(schema, table_name)
    cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)

    connection.commit()
    cursor.close()

Just sharing in case other people run into the same.

Thank you again for the great technique!

from odo.

rpanai avatar rpanai commented on August 21, 2024 2

I was not able to use odo and I'm wondering if this project is still maintained. Maybe @mrocklin can tell us something about. Anyway I modified your solution to work in python 3 too.

from io import StringIO
def sendToPG(df, tableName, con):
    output = StringIO()
    df.to_csv(output, sep='\t', header=False, index=False)
    output.getvalue()
    output.seek(0)
    raw = con.raw_connection()
    curs = raw.cursor()
    # null values become ''
    columns = df.columns
    curs.copy_from(output, tableName, null="", columns=(columns))
    curs.connection.commit()
    curs.close()

from odo.

d6tdev avatar d6tdev commented on August 21, 2024 2

In case you are looking for an alternative, you might want to consider d6tstack. You can process CSV and then export to csv, parquet or SQL without having to write custom functions. You can load multiple files and it deals with data schema changes (added/removed columns). Chunked out of core support is already built in. It benchmarks well vs df.to_sql().

def apply(dfg):
    # do stuff
    return dfg

c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',')

# or
c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply)

# output to various formats, automatically chunked to reduce memory consumption
c.to_csv_combine(filename='out.csv')
c.to_parquet_combine(filename='out.pq')
c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename')	# slow but flexible

from odo.

Chithra1206 avatar Chithra1206 commented on August 21, 2024 1

@SandervandenOord Thank you for this amazing technique! I had to push over 4 GB of data(mostly text fields - conversations) to Postgres and this made my task much simpler. It took 30 minutes (actual wall time - 30min 35s) to run.

I modified @wolfc86 code to add chunksize option in df.to_csv(...).

Then I split the dataframe into chunks, created a header outside the method, and pushed the data into the db in small batches(instead of bulk insert). Took 8 minutes to execute.

Revised snippet:

def df_to_database(engine, df, schema, table_name, if_exists='append', encoding='utf-8', index=False):
    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=',', header=False, encoding=encoding, index=index, chunksize = 5000)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()

    schema_tablename = '"{}"."{}"'.format(schema, table_name)
    cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)

    connection.commit()
    cursor.close()

# Add header
df[:0].to_sql(table_name, engine, schema , index=False, if_exists= 'replace')

# Split the dataframe into a list of dataframes
n = int(round(1/10 * df.shape[0], -2))
split_df = [df[i:i+n] for i in range(0, df.shape[0], n)]

# Call df_to_database() on chunks
for chunk in range(len(split_df)):
    df_to_database(engine, split_df[chunk], schema, table_name)

Thanks again!

from odo.

camesine avatar camesine commented on August 21, 2024

I have a pandas dataframe of 8 million rows. I found a fast method on stackoverflow to write this data to Postgres: takes less than 2 minutes.
Today I found odo and was hoping it would be faster, but it wasn't.
In fact my guess is it would take 30 minutes or so with odo, I stopped after 7 minutes.

Here's the code for faster writing of dataframe to Postgres, hope it helps you guys:

from cStringIO import StringIO

def df_to_database(engine, df, schema, table_name, if_exists='replace', sep='\x01', encoding='utf-8'):
    # Create Table
    df[:0].to_sql(table_name, engine, if_exists=if_exists, index=False, schema=schema)

    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding, index=False)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    schema_tablename = '{}.{}'.format(schema, table_name)
    cursor.copy_from(output, schema_tablename, sep=sep, null='')
    connection.commit()
    cursor.close()

Thanks!!!

from odo.

PhilippeH1967 avatar PhilippeH1967 commented on August 21, 2024

Hi could you help me to understand what you will add in schema ?

from odo.

mairanteodoro avatar mairanteodoro commented on August 21, 2024

@wolfc86 You forgot to add index as a function parameter. The first line of your code should be:

def df_to_database(engine, df, schema, table_name, if_exists='replace', encoding='utf-8', index=False):
    [...]

from odo.

wolfc86 avatar wolfc86 commented on August 21, 2024

Ah, right you are @mairanteodoro! I updated the snippet in my comment. Thanks!

from odo.

PandaWhoCodes avatar PandaWhoCodes commented on August 21, 2024

@SandervandenOord Thank you for this amazing technique! I had to push over 4 GB of data(mostly text fields - conversations) to Postgres and this made my task much simpler. It took 30 minutes (actual wall time - 30min 35s) to run.

I modified @wolfc86 code to add chunksize option in df.to_csv(...).

Then I split the dataframe into chunks, created a header outside the method, and pushed the data into the db in small batches(instead of bulk insert). Took 8 minutes to execute.

Revised snippet:

def df_to_database(engine, df, schema, table_name, if_exists='append', encoding='utf-8', index=False):
    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=',', header=False, encoding=encoding, index=index, chunksize = 5000)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()

    schema_tablename = '"{}"."{}"'.format(schema, table_name)
    cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)

    connection.commit()
    cursor.close()

# Add header
df[:0].to_sql(table_name, engine, schema , index=False, if_exists= 'replace')

# Split the dataframe into a list of dataframes
n = int(round(1/10 * df.shape[0], -2))
split_df = [df[i:i+n] for i in range(0, df.shape[0], n)]

# Call df_to_database() on chunks
for chunk in range(len(split_df)):
    df_to_database(engine, split_df[chunk], schema, table_name)

Thanks again!

Works like a charm. Thanks

from odo.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.