Comments (1)
well, this won't pass.
`"""
pandabase converts pandas DataFrames to & from SQL databases
It replaces pandas.to_sql and pandas.read_sql, and requires the user
to select a unique index. This allows upserts and makes it easier to
maintain a dataset that grows over time. Especially time series.
pandabase:
is much simpler than pandas.io.sql
is only compatible with newest versions of Pandas & sqlalchemy
is not guaranteed
definitely supports sqlite, may or may support other backends
uses the sqlalchemy core and Pandas; has no additional dependencies
by sam beck
github.com/notsambeck/pandabase
largely copied from pandas:
https://github.com/pandas-dev/pandas
and dataset:
https://github.com/pudo/dataset/
"""
from .helpers import *
import pandas as pd
from pandas.api.types import is_string_dtype
from pytz import UTC
import sqlalchemy as sqa
from sqlalchemy import Table
from sqlalchemy.exc import IntegrityError
import logging
def to_sql(df: pd.DataFrame, *,
table_name: str,
con: str or sqa.engine,
how='fail',
strict=True, ):
"""
Write records stored in a DataFrame to a SQL database.
converts any datetime to UTC
Requires a unique, named index as DataFrame.index; to insert into existing database, this must be consistent
Parameters
----------
df : DataFrame, Series
table_name : string
Name of SQL table.
con : connection; database string URI < OR > sa.engine
how : {'fail', 'upsert', 'append'}, default 'fail'
- fail:
If table exists, raise an error and stop.
- append:
If table exists, append data. Raise if index overlaps
Create table if does not exist.
- upsert:
create table if needed
if record exists: update
else: insert
use_index: default True; if True, use the index as pk in table, else use an automatic integer index
strict: default False; if True, fail instead of coercing anything
"""
##########################################
# 1. make connection objects; validate inputs
df = df.copy()
engine = engine_builder(con)
meta = sqa.MetaData()
if how not in ('fail', 'append', 'upsert',):
raise ValueError("'{0}' is not valid for if_exists".format(how))
if not isinstance(df, pd.DataFrame):
raise ValueError('to_sql() requires a DataFrame as input')
if not df.index.is_unique:
raise ValueError('DataFrame index is not unique.')
if df.index.hasnans:
raise ValueError('DataFrame index has NaN values.')
if df.index.name is None:
raise ValueError('DataFrame.index.name == None; must set with df.index.name')
# make a list of df columns for later:
df_cols_dict = make_clean_columns_dict(df)
###########################################
# 2a. make table from db schema; table will be the reference
if has_table(engine, table_name):
if how == 'fail':
raise NameError(f'Table {table_name} already exists; param if_exists is set to "fail".')
table = Table(table_name, meta, autoload=True, autoload_with=engine)
if how == 'upsert' and table.primary_key == PANDABASE_DEFAULT_INDEX:
raise IOError('Cannot upsert with an automatic index')
# 2b. unless it's a brand-new table
else:
logging.info(f'Creating new table {table_name}')
table = Table(table_name, meta,
*[make_column(name, info) for name, info in df_cols_dict.items()
if info['dtype'] is not None])
###############################################################
# 3. iterate over df_columns; confirm that types are compatible and all columns exist, delete empty columns
new_cols = []
drop_cols = []
for name, df_col_info in df_cols_dict.items():
if df_col_info['dtype'] is None:
drop_cols.append(name)
if name not in table.columns:
if df_col_info['dtype'] is not None:
new_cols.append(make_column(name, df_col_info))
continue
else:
logging.warning(f'tried to add all NaN column {name}')
continue
# check that dtypes and PKs match for existing columns
col = table.columns[name]
if col.primary_key != df_col_info['pk']:
raise ValueError(f'Inconsistent pk for col: {name}! db: {col.primary_key} / '
f'df: {df_col_info["pk"]}')
if df_col_info['dtype'] is None:
df_col_info['dtype'] = get_column_dtype(col, pd_or_sqla='pd')
db_sqla_dtype = get_column_dtype(col, pd_or_sqla='sqla')
# COERCE BAD DATATYPES
if not db_sqla_dtype == df_col_info['dtype']:
"""
this is where things become complicated
we know type of existing db_column; this is the correct type for new data
we know the pandas dtype of df column
this section of code will generally execute if the db dtype is a real type, and df dtype is 'object'
often as a result of the df column being a non-nullable Pandas dtype (np.int64, np.bool) with Nans,
and the db column being the 'true' datatype (possibly plus NULL)
simply coercing the column with .astype(np.bool) etc. also coerces None to 'None"
solution?
build a mask of NaN values
coerce columns
when inserting values to db,
first check if value is None,
then insert
"""
mask = df.isna()
db_pandas_dtype = get_column_dtype(col, pd_or_sqla='pd')
if is_datetime64_any_dtype(db_pandas_dtype):
df[name] = pd.to_datetime(df[name].values, utc=True)
elif (
is_integer_dtype(df_col_info['dtype']) and is_float_dtype(db_pandas_dtype)) or (
is_float_dtype(df_col_info['dtype']) and is_integer_dtype(db_pandas_dtype)
):
df[name] = df[name].astype(db_pandas_dtype)
df[mask][name] = None
elif is_string_dtype(df_col_info['dtype']):
df[name] = df[name].astype(db_pandas_dtype)
df[mask][name] = None
else:
raise ValueError(
f'Inconsistent type for col: {name}. '
f'db: {db_pandas_dtype} /'
f'df: {df_col_info["dtype"]}')
# delete any all-NaN columns
if drop_cols:
df.drop(drop_cols, axis=1, inplace=True)
for col in drop_cols:
del df_cols_dict[col]
# Make any new columns as needed with ALTER TABLE
if new_cols:
col_names_string = ", ".join([col.name for col in new_cols])
new_column_warning = f' Table[{table_name}]:' + \
f' new Series [{col_names_string}] added around index {df.index[0]}'
if strict:
logging.warning(new_column_warning)
for new_col in new_cols:
with engine.begin() as conn:
conn.execute(f'ALTER TABLE {table_name} '
f'ADD COLUMN {new_col.name} {new_col.type.compile(engine.dialect)}')
df.index.name = clean_name(df.index.name)
# convert any non-tz-aware datetimes to utc using pd.to_datetime (warn)
for col in df.columns:
if is_datetime64_any_dtype(df[col]) and df[col].dt.tz != UTC:
if strict:
raise ValueError(f'Strict=True; column {col} is tz-naive. Please correct.')
else:
logging.warning(f'{col} was stored in tz-naive format; automatically converted to UTC')
df[col] = pd.to_datetime(df[col].values, utc=True)
with engine.begin() as con:
meta.create_all(bind=con)
######################################################
# FINALLY: either insert/fail, append/fail, or upsert
if how in ['append', 'fail']:
# raise if repeated index
with engine.begin() as con:
rows = []
for index, row in df.iterrows():
# append row
rows.append(row.to_dict())
# add index name to row
rows[-1][df.index.name] = index
con.execute(table.insert(), rows)
elif how == 'upsert':
for i in df.index:
# check index uniqueness by attempting insert; if it fails, update
try:
with engine.begin() as con:
row = df.loc[i]
row[row.isna()] = None
values = row.to_dict()
values[df.index.name] = i # set index column to i
insert = table.insert().values(values)
con.execute(insert)
except IntegrityError:
print('Upsert: Integrity Error on insert => do update')
with engine.begin() as con:
row = df.loc[i]
row[row.isna()] = None
upsert = table.update() \
.where(table.c[df.index.name] == i) \
.values(row.to_dict())
con.execute(upsert)
return table
def read_sql(table_name: str,
con: str or sqa.engine,
columns=None):
"""
Convenience wrapper around pd.read_sql_query
Reflect metadata; get Table or Table[columns]
TODO: add range limit parameters
:param table_name: str
:param con: db connectable
:param columns: list (default None => select all columns)
"""
engine = engine_builder(con)
meta = sqa.MetaData(bind=engine)
table = Table(table_name, meta, autoload=True, autoload_with=engine)
# find index column, dtypes
index_col = None
# make selector from columns, or select whole table
if columns is None:
s = sqa.select([table])
else:
if index_col not in columns:
raise NameError(f'User supplied columns do not include index col: {index_col}')
selector = []
for col in columns:
selector.append(table.c[col])
s = sqa.select(selector)
datetime_cols = []
other_cols = {}
datetime_index = False
for col in table.columns:
dtype = get_column_dtype(col, pd_or_sqla='pd')
if col.primary_key:
index_col = col.name
datetime_index = is_datetime64_any_dtype(dtype)
continue
if is_datetime64_any_dtype(dtype):
datetime_cols.append(col.name)
else:
other_cols[col.name] = dtype
df = pd.read_sql_query(s, engine, index_col=index_col)
# mask = df.isna()
for name in datetime_cols:
df[name] = pd.to_datetime(df[name].values, utc=True)
# df[mask][name] = pd.NaT
if datetime_index:
index = pd.to_datetime(df.index.values, utc=True)
df.index = index
df.index.name = index_col
return df`
from pandas-vet.
Related Issues (20)
- pandas-vet should run when flake-8 invoked by pre-commit HOT 1
- Automate releases to PyPI and conda-forge
- Challenging PD008: .at can be useful HOT 3
- False positive: dict().values()
- False positive: PD005 for regex `sub` method HOT 1
- New approach to docs with JupyterBook HOT 1
- Use generator expressions when possible HOT 2
- inplace set to a variable raises exception
- PD011 - shoud be to_numpy() instead of to_array()?
- Improve contributor experience HOT 1
- Constant column check with `nunique` HOT 2
- `PD012` is outdated HOT 1
- Improve official docs
- Check for proper `reset_index`
- torch.Tensor.sort return type values access PD011 false positive
- Add nox support to run tests, flake8, black locally HOT 1
- Add banners to Readme. HOT 1
- Add check for python functions that should be pandas methods
- Add notes about disabling checks to README
- Conda and pip install (0.2.2) not showing plugins with flake8 --version; 0.2.1 works HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pandas-vet.