Giter Site home page Giter Site logo

britishgeologicalsurvey / etlhelper Goto Github PK

View Code? Open in Web Editor NEW
87.0 87.0 23.0 1.29 MB

ETL Helper is a Python ETL library to simplify data transfer into and out of databases.

Home Page: https://britishgeologicalsurvey.github.io/etlhelper/

License: GNU Lesser General Public License v3.0

Dockerfile 0.72% Shell 0.75% Python 98.53%

etlhelper's People

Contributors

dvalters avatar kerberpolis avatar kinow avatar kinverarity1 avatar koalageo avatar leorudczenko avatar metazool avatar rbroth avatar real-gecko avatar volcan01010 avatar ximenesuk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

etlhelper's Issues

Add load() function

Summary

As an ETL user, I would like a simple load function, so that I don't have to write the INSERT query by hand.

Description

Writing an insert query to use with executemany or copy_rows can be time consuming, and in many cases the aim is only to write the data from the input the the appropriate columns.
In this case, a load function could save effort by automatically generating the INSERT statement from the data given.
A simple load function would also make it easy to write a copy_table_rows function.

def load(table, conn, data):
    """
    Load the data into pre-existing table on conn.
    """
    columns = get_column_names(data)
    insert_query = generate_insert_query(columns, conn)
    executemany(insert_query, conn, data)


def get_column_names(data):
    """Return the column names of a data structure."""
    # this is the tricky bit - it has to handle NamedTuple or Dictionary and "peek" into the iterable to get
    # the first item without losing it
    return column_names


def generate_insert_statement(columns, conn):
    # this function has to use a DbHelper to work out the correct placeholder for conn, then
    # write the correct statement using either named or numeric placeholders
    return insert_statement

--reinstall option implementation notes

Specification

(from @jostev at #2 (comment) )

Specification for --reinstall flag:

This will need an if (not) reinstall before calling _oracle_client_is_configured and another before deciding to call _install_instantclient.

  • --reinstall flag added to argparse that uses "store_true" if called
  • setup_oracle_client takes reinstall as optional keyword argument (default=False)
  • if reinstall, then _install_instantclient function should be called, whatever already_installed is
  • unit test for calling setup_oracle_client(dummy_zip, reinstall=True), assuming driver is not configured but is installed
  • unit test for calling setup_oracle_client(dummy_zip, reinstall=False) assuming driver is not configured but is installed
  • unit test for calling setup_oracle_client(dummy_zip, reinstall=True) assuming driver is configured

The unit tests will need to use Pytest's monkeypatch and unittest.mock.Mock to mock out the following functions:

  • _oracle_client_is_configured (mock so return value is always False)
  • _check_install_status (mock so we can control return value)
  • _install_instantclient (so that tests can use assert_called_once() or assert_not_called())

Example of mocking from elsewhere in etlhelper:

monkeypatch.setattr(driver, 'connect', mock_connect)

References

Implementation notes

More detailed notes and ticket for the comment on the big issue at #2

From the comment at #2 (comment)

def setup_oracle_client(zipfile_location):

Add a reinstall option == false argument.

if _oracle_client_is_configured():

Should reinstall if client configured, do not return here even if configured.

if not already_installed:

Needs modified so that it will override this behaviour and reinstall if asked for.

"""Parse args and run setup_oracle_client function."""

Add the argument here.

setup_oracle_client(zip_location)

Pass args.reinstall here.

To demo that it works

  • Whatever state of the installation we are at, make sure that install_instant client is called if reinstall is asked for.

Add a test called test_setup_oracle_client()

if _oracle_client_is_configured():

Should be configured in the test to run with a Mock object that will return true or false as required.

already_installed = _check_install_status(install_dir, script_dir, bin_dir)

Needs to be Mocked out so that we can control the return value.

_install_instantclient(zipfile_location, install_dir, script_dir,

Replace this with a MockInstallInstantClient so we can assert is has been called.

Tests

test_has_been_installed_configured
test_has_been_installed_not_configured
and vice versa with hasn't been installed.

(2x2 matrix of options)

Example of mocking from elsewhere in etlhelper:

monkeypatch.setattr(driver, 'connect', mock_connect)

Increase exception coverage for failed cursor.execute calls

Summary

Current cursor.execute calls in iter_chunks, execute, and execute_many only raise exceptions for helper.sql_exceptions. I'd like to expand the exception handling to include others for postgresql-related queries (psychopg2) such as InterfaceError and others.

Reasoning: I'm trying to track down a connection error that appears for long-running tasks in an Airflow ETL I am working on. I'm getting connection already closed and server terminated abnormally errors.

If I can figure out the connection errors and the solution is something that would be useful to others, I'll create another ticket to update etlhelper so it can better handle long-running Airflow workflows.

Note: it's also possible I'm missing something elsewhere, but this should help me narrow it down.

Further testing

Summary

See coverage html.

  • Test on the zip location download (mock out download ziplfile, if zipfile location starts with http)
  • Test on the args parse (can they be mocked?)
  • Test returns when oracle client is configured and not a reinstall (check log messages?)
  • Check check install status when the install dir is empty.

wk buffer merge failure

When trying to load geometries from oracle with SDO_UTIL.TO_WKTGEOMETRY(shape) it errors with the following message:

ORA-13199: wk buffer merge failure
ORA-06512: at "MDSYS.SDO_UTIL", line 729
ORA-06512: at "MDSYS.SDO_UTIL", line 768

sql = f"select objectid, SDO_UTIL.TO_WKTGEOMETRY(shape) geom from BGS_GEOM.GB_50K_BEDROCK_GEOM_VOL"
with ORACLEDB.connect('ORACLE_PASSWORD') as conn:
   conn.outputtypehandler = output_type_handler
   for row in iter_rows(sql, conn):
        pass

I've managed to bypass this problem for now by using SDO_UTIL.TO_GMLGEOMETRY() instead of SDO_UTIL.TO_WKTGEOMETRY() and then converting it to an ogr geometry with ogr.CreateGeometryFromGML()

Add fetch methods

Summary

Adding fetchone provides a convenient way to get a single row tuple. Adding fetchmany and fetchall methods make the methods consistent with the DB API 2.0 cursor specifications.

Description

There is currently no easy way to return a single row, equivalent to cursor.fetchone using the etl methods. Instead, values need to be unpacked from a list, or calling next on iter_rows results. Adding a fetchone function would make this nicer. It would just use iter_rows and return the first value.

fetchmany could be modelled on the the cursor.fetchmany call signature and use itertools.slice to return the correct number of rows. Whether or not to use named arguments and default values should be investigated.

fetchall would be synonymous with get_rows.

These new functions would be defined in the etl module.

Acceptance criteria

  • New fetchone function returns single tuple
  • New fetchmany function returns list of tuples of specified length
  • New fetchall function returns list of all tuples
  • Integration tests are provided for at least PostgreSQL implementation

Allow custom error handling for executemany (and execute?)

Summary

As an etl process writer, I would like to customise what happens when executemany encounters an error so that I can ignore errors if required.

Details

The current behaviour of executemany is to raise an ETLHelperInsertError if there is a problem inserting data (e.g. failed foreign key constraint). This aborts the whole process. In this situation, the commit_chunks flag determines whether the whole process is rolled back, or if chunks that have already been transferred are preserved.

Sometimes is it useful to just log an error and continue. Other times, users may want to do something else (e.g. update a database table with failure information). For maximum flexibility, we could allow users to pass through a function to be called when executemany raises an error.

msg = (f"SQL query raised an error.\n\n{query}\n\n"

etlhelper could provide two built-in options. The first replicates the current behaviour. The second allows the process to continue.

def log_and_raise(chunk, exc, logger):
    msg = (f"Failed to insert chunk: [{chunk[0]}, ..., {chunk[-1]}]\n"
                f"SQL query raised an error.\n\n{query}\n\n"
                f"Required paramstyle: {helper.paramstyle}\n\n{exc}\n")
    logger.debug(msg)
    raise ETLHelperInsertError(msg)
def log_and_continue(chunk, exc, logger):
    msg = (f"Failed to insert chunk: [{chunk[0]}, ..., {chunk[-1]}]\n"
                f"SQL query raised an error.\n\n{query}\n\n"
                f"Required paramstyle: {helper.paramstyle}\n\n{exc}\n")
    logger.error(msg)

Acceptance criteria

  • executemany has an on_error argument that takes a function of chunk, exc, logger that is called when an error is encountered
  • copy_rows also takes an on_error flag and passes it through to executemany
  • log_and_raise and log_and_continue are defined in etlhelper.exceptions
  • default option is log_and_raise
  • tests in test.integration.etl.test_etl_load cover each error handling function in executemany
  • tests in test.integration.etl.test_etl_transform cover each error handling function in copy_rows

Set up Travis CI

Set up testing on github's Travis CI.

  • MWE would include running the self-contained unit tests to get us started.

WIP: Add connection pooling option

Summary

As an API-developer, I want to be able to reuse database connections so that my application isn't slowed by repeatedly creating new ones.

Details

Creating database connections can be a bottleneck in code. Connection pools reduce this by maintaining some connections to the database between use. This won't make much difference for an ETL process, but is likely to be significant where etlhelper is used in an API.

All the Python drivers for server-based databases include options for Connection (sometimes called Session) pooling.

This aim of this issue is to provided easy session pooling in etlhelper.

Connection pools in different database types

SQLAlchemy uses pools and has some documentation here: https://docs.sqlalchemy.org/en/13/core/pooling.html

Questions about implementation

Adding pooling will make the etlhelper code slightly more complicated and using pools may have implications for multi-threaded applications, so there are some questions about how tightly we want to integrate pooling logic with etlhelper.

There are some things to consider:

  • All the etlhelper.etl functions accept a connection, so users that manage their own pools can still use the functions. We could just provide some recipes.
  • It would be straightforward to add a DbHelper.connection_pool() function that created a pool for the database. This would save users having to know the precise details. This would require DbHelpers to be singletons so that we could always access the reference to the pool. (Singleton behaviour is also required for #86)
  • The syntax for getting a connection varies between databases (acquire and release for cx_Oracle, getconn and putconn for psycopg2). A DbHelper.connect_from_pool() function could abstract this away. It would require a context manager (similar to the one used in sqlite connections) to close connections.
  • pyodbc uses pooling by default. So does SQLAlchemy. Should we just always use pooled connections? What if it breaks some threaded applications?
  • The pool functions would only apply to Oracle and PostgreSQL - what do we do for pyodbc and sqlite? Raise errors / return normal connections with Warning?

Acceptance criteria

To be decided...

Add connection module functions as methods on DbParams

Summary

As a developer, I want to be able to create connections directly from DbParams so that I don't need to import extra functions.

Description

The functions in the connect.py module all take DbParams objects as the the first input, and all must be individually imported to be used. If they were added to the DbParams objects as methods, it would save an import in the user code.

Current form:

from etlhelper import connect

with connect(MY_DB, 'PASSWORD_VARIABLE') as conn:
    do_something()

Implementation suggestion:

# db_params.py
from etlhelper.connect import connect, ...

...DbParams...
    def connect(self, password_variable, *args):
        return connect(self, password_variable, *args)

    def get_connection_string(self, password_variable):
        ...

    def get_sqlalchemy_connection_string(self, password_variable):
        ...

The function could then be used as:

with MY_DB.connect('PASSWORD_VARIABLE') as conn:
    do_something()

Acceptance criteria

  • DbParams.connect() works
  • DbParams.get_connection_string() works
  • DbParams.get_sqlalchemy_connection_string() works
  • All are proven by tests
  • README.md DbParams description includes functions
  • README.md examples use new syntax

Fix commit_chunks in load() function

Description

The commit_chunks parameter that gets passed to executemany by the load function has been hard-coded to True.
If should be passed down from the commit_chunks variable as set in the function call.

executemany(query, conn, rows, commit_chunks=True)

    executemany(query, conn, rows, commit_chunks=True)

Acceptance criteria

  • Call to executemany passes commit_chunks=commit_chunks
  • Integration test demonstrates correct behaviour

Breaking changes

Summary

This ticket lists breaking changes that we would like to make to ETL Helper. These should coincide with a major version change.
Where an issue already exists, it can be linked here.

Some changes relate to creation of a Database class (see #97), which we would like to do in the medium term.

These changes should be made on a for_v1 branch.

Details

  • Deprecate read_lob flag. This functionality should be part of the connection configuration (see #110).
  • #155
  • #154
  • #152
  • #153
  • Refactor logging configuration. #125
  • Set minimum supported Python to 3.9 (to benefit from security updates and typing improvements)

If we have a Database class

  • Remove .connect() from DbParams classes. This is more appropriate for a Database class.

Make `paramstyle` a property on DbParams objects

Summary

As a developer, I want to find the required paramstyle for my database via MY_DB.paramstyle.

Description

The paramstyle is a DBAPI2.0 required value that describes whether to use ?, %s, :name or other values for the placeholder in a query. It is available as a parameter on the DbHelper class, but it would be convenient if it could be read from a user-facing DbParams object, as the DbHelpers are only used internally.

Implementation suggestion:

# db_params.py

    @property
    def paramstyle(self):
        return DB_HELPER_FACTORY.from_dbtype(self.dbtype).paramstyle

Acceptance criteria

  • DbParams have paramstyle attribute
  • README.md updated to explain how it can be used

Document individual functions

It would be useful to have a list of functions and methods in library, aling with their arguments, what they return, and what they do. Similar to how methods are documented for e.g. node-oracledb

Add paramstyle to DbHelper classes and Extract and Insert errors

Summary

The paramstyle attribute should be added DbHelper classes to give access to that information to users.

Details

The DBAPI2 specifies that each database driver should record the parameter style used for parameterised queries. The value is different for PostgreSQL, Oracle and MSSQL. Adding this to the DbHelper class makes it easier for users to find the style for their database. Adding it to error messages will help users debug. This is be useful in the etl module.

Acceptance criteria

  • paramstyle attribute is added to base DbHelper metaclass to ensure all children have it
  • Unit tests demonstrate parameter is set to the correct value for each DbHelper type
  • msg text where BGSETLInsertError and BGSETLExtractError are raised by etl module contains: 'Database paramstyle is "named"' or similar.

SQL Server integration tests failing due to missing libs

Tests say this: pyodbc.Error: ('01000', "[01000] [unixODBC][Driver Manager]Can't open lib '/opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.3.so.1.1' : file not found (0) (SQLDriverConnect)")

File does exist, we check what libraries it is linked to as per https://stackoverflow.com/a/54575067/4319767

root@3b40b24c83b9:/app# ldd /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.3.so.1.1
        ...
        libcrypto.so.1.0.2 => not found
        libssl.so.1.0.2 => not found

We discover that debian stretch no longer provides libssl1.0.2 it is all libssl1.1 now

SQLite driver

Summary

SQLite is a common and useful database format that it would be good to support.

Details

Adding a new driver requires:

  • add a SQLiteDbHelper
  • register it with DB_HELPER_FACTORY
  • add unit tests for correct connection and SQLAlchemy strings
  • add integration test to copy rows from one table to another. This will require fixtures such as sqlite_test_table, sqlite_destination_table and SQL functions to create the tables, select the data and insert it. There should be tests for happy_path, bad_connect, bad_select, bad_insert.

The test_integration_etl.py module is getting large; the SQLite tests should be put in their own file e.g. test/integration/db/test_sqlite.py. This can serve as a model for other database types.

Acceptance criteria

  • sqlite.py created in db_helpers package and registered with DB_HELPER_FACTORY
  • DbParams module has filename attribute and validation to check it is set for dbtype of SQLITE
  • Unit tests for connection strings
  • Integration tests for copy (which inherently tests iter_rows and execute_bulk)

Optional Postgres Insert Speed-up

I've been doing some research into speeding up my ETLs for Oracle -> Postgres using etlhelper, and came across this article for loading data into postgresql. It looks like using psycopg2's copy_from in combination with io.StringIO could result in up to 5x performance improvements on the Postgres side. Is there a way to leverage this for etlhelper? Maybe an optional flag for the postgres db_helper's executemany function to use it? Would be amazing to be able to cut my ETL time down for multi-million-row tables.

Make consistent use of rowfactory or row_factory

Summary

We want to decide whether to write rowfactory or row_factory so that we are internally consistent.

Description

The spelling of row_factory is not consistent within etlhelper, as the following example demonstrates:

rows = get_rows(sql, conn, row_factory=dict_rowfactory, parameters=input_dict)

We should choose one and make it consistent. The sqlite3 module in the standard library uses row_factory, so that seems the best choice: https://docs.python.org/3.5/library/sqlite3.html#sqlite3.Connection.row_factory

Note that this is a breaking change, so it should be saved until a major release and advertised in advance with a deprecation warning.

Acceptance criteria

  • A deprecation warning has been put out warning of the upcoming change
  • No instances of rowfactory remain in the code base

Distribute a version as wheel

Summary

We want to distribute etlhelper via PyPI as a wheel so that it downloads and installs more quickly.

Description

Python wheels are binary files that contain packages. They can contained pre-compiled versions of required libraries, but we don't need those. The main advantage is that they install more quickly.

https://realpython.com/python-wheels/

Acceptance criteria

  • pip install etlhelper installs the wheel version on Linux
  • pip install etlhelper installs the wheel version on Windows

setup_oracle_client improvements

Summary

There are some improvements to setup_oracle_client that will make it more robust and useful.

Details

The script is not covered by tests. A unit test that installed from a dummy zip file could be created with minimal mocking. It would demonstrate functionality.

The data pump provides similar functionality to pgdump for PostgreSQL and is provided by the instantclient tools.

Error zipfile.BadZipFile is thrown if the download was aborted and the partially installed file remains. This is annoying.

Acceptance criteria

  • --with-sqlplus flag installs the SQL*Plus package and puts link into PATH (let's keep this simple for now)
  • --with-tool flag installs the instantclient tools package and puts link into PATH (lets's keep this simple for now)
  • Unit test installs a dummy zip file into a temporary directory
  • Pre-existing oracle_lib_path_export files on the PATH are removed (use source instead)
  • $VIRTUAL_ENV is used first to determine target location for oracle_lib_path_export (use source instead)
  • --log DEBUG is replaced with -v flag
  • Logging format is cleaned up to replace DEBUG:root:
  • Corrupt, partially-downloaded zip files are automatically cleaned (cleanup before every install)
  • Can do just the environment setup part without having to download the client zip each time
  • --reinstall reinstalls packages, even if already correctly installed (e.g. to change instant client version)

Make executemany return row count

Summary

As a developer, I want executemany to return a count of processed rows so that I can use it elsewhere.

Description

executemany and related functions load, copy_rows, copy_table_rows should all return the number of rows that were processed. This would let the value be used downstream or in further logging.

Acceptance criteria

  • executemany returns the number of rows inserted
  • load returns the number of rows inserted
  • copy_rows returns the number of rows inserted
  • copy_table_rows returns the number of rows inserted

Implement simple table copy function

Summary

As a user, I would like a function to solve the common use case of transferring data between tables with no transformation.

Description

In many situations a data are copied from a table in one database to an equivalent table on another database.
All column names are the same and no other transformation is required.
A generic function could save the work required to define select and insert queries in this case.

The implementation would look something like:

def simple_copy(
    source_table, src_conn, dest_conn,
    dest_table=None, to_lower=False, where=None, columns=None):
    """
    Copy the contents of table from src_conn to dest_conn.  An alternative destination table name can be specified, as can a subset of columns or a `where` clause to filter the rows.
    """
    columns = get_column_names(source_table, src_conn)

    # Convert upper case column names to lower
    if to_lower:
        columns = [column.lower() for column in columns]

    column_list = ', '.join(columns)
    insert_placeholders = ', '.join([f'%s' for _ in columns])

    if not dest_table:
        dest_table = source_table

    select_sql = f"SELECT {column_list} FROM {source_table}"
    insert_sql = f"INSERT INTO {dest_table} ({column_list}) VALUES ({insert_placeholders})"

    copy_rows(select_sql, src_conn, insert_sql, dest_conn)


def get_column_names(table, conn):
    """Return the column names of table."""
    sql = f"SELECT * FROM {table} LIMIT 1"
    result = fetchone(sql, conn, row_factory=dict_row_factory)
    return tuple(result.keys())

Tricky aspects are:

  • Automatic generation of placeholders depending on dest_conn database type (e.g. %s, ?)
  • Protecting where clause insertion against SQL injection attacks, although maybe that doesn't matter as this function will not be used with user-generated strings.
  • Column names with upper case or spaces must be surrounded by double quotes in PostgreSQL
  • Moving between database may decide that column names should change to lower case.

Help with pyinstaller

Hello,

I realize a simple program to create a SQLite database from an Oracle schema.
The script works very well and very fast.

Then, I tried to generate an exe file in order to quickly test the performance on different customers, e.g. without install python and all dependencies.

I used pyinstaller, with the following command line:

pyinstaller --onefile --hidden-import=etlhelper.DBParams,etlhelper.connect,etlhelper.copy_rows,etlhelper.execute createdb.py
previously I tried:

pyinstaller --onefile --hidden-import=etlhelper createdb.py

The result is the same, when I execute the program from command line, I obtain:

Traceback (most recent call last):
  File "createdb.py", line 6, in <module>
    from etlhelper import DbParams
ModuleNotFoundError: No module named 'etlhelper'
[9504] Failed to execute script createdb

Probably I make some mistakes, but I can't understand where. Do you have any experience in using etlhelper with pyinstaller ?

Thanks in advice, and best regards and compliments for your work

Giuseppe Garzotto

Refactor logging to add `log_to_stderr` function

Summary

As an ETL Helper user I want a simple way to turn on logging that doesn't add a handler to the ETL Helper logger by default so that I don't get multiple messages.

Description

The current ETL Helper logging setup is configured here:

# Prepare log handler. See this StackOverflow answer for details:

The configuration adds a logger with a custom handler that handles DEBUG messages in custom way so that SQL queries and other details that they include are easy to read. By default it is set to WARNING so that it doesn't emit many messages. Logging can be turned on by taking a reference to the logger and setting the level to INFO or below. See:

### Debug SQL and monitor progress with logging

However, adding a logger to a library is considered bad practice. It would be better if we didn't do this by default. But it would still be useful to be able to easily configure this logger, especially for novice Python users. A function to add the handler would solve this problem. e.g.

def log_to_stderr(level=logging.INFO):
    """Log ETL Helper messages to stderr at level."""
    logger = logging.getLogger("etlhelper")
    # Add handler
    ...
    # Set level
    logger.setLevel(level)

Users would then just add the following to the top of their scripts:

import etlhelper

etlhelper.log_to_stderr()

Acceptance criteria

  • etlhelper logger has no handler by default
  • log_to_stderr() adds the current handler and sets logger to given level
  • README.md is updated

Consolidate MSSQL vs SQLServer

Summary

In some locations in the code Microsoft SQL Server is referred to as mssql, in others it is sql_server. One of these should be chosen and used consistently to avoid confusion.

Acceptance criteria

  • sql_server or mssql are used consistently throughout the code.

ld_library_prepend.sh not compatible with sh

I saw this while installing etlhelper in a container with sh rather than bash as default shell (and using . rather than source to $(setup_oracle_client)

It doesn't like the [[ syntax here

if [[ "${{LD_LIBRARY_PATH}}" != "{lib_path}"* ]]
- "Double brackets are a bashism" https://scriptingosx.com/2018/02/single-brackets-vs-double-brackets/

I can switch my docker build to bash so it's not showstopping at all but i bet others will run into it

Publish documentation online

Current documentation is in README.md. A Sphinx-based HTML page would make it easier to organise, search and read. These pages should be automatically published online via CI.

Online location could be readthedocs or github.io pages.

  • Documentation is moved from README.md to Sphinx-based pages
  • CI pipeline publishes automatically on a new release

Add connection identifier to logging output

Summary

As an ETL pipeline user, I want to know which database connection log messages correspond to so that I can have a better idea of where my process is.

Description

When messages from ETL Helper logger are consumed by other handlers they can end up like this:

16:43:26 etlhelper INFO   Fetching rows
16:43:27 etlhelper INFO   108 rows returned
16:43:30 etlhelper INFO   Executing query

If the pipeline involves multiple databases, there is no way to know which database the messages correspond to. The messages should be updated to provide some information about the connection.

(It is probably not possible to put the DbParams variable name in the message there as I'm not sure if that is accessible when the log messages are written. If it was possible, that would be best. It may be necessary to add a "name" attribute to DbParams classes.)

Acceptance criteria

Log messages look like:

  • [ ] 16:43:30 etlhelper INFO Fetching rows from MY_DB

of if that isn't possible

Add ETL to project description

Summary

As someone looking for an ETL tool, I want ETL to appear in the project description so that ETL Helper comes up in PyPI searches.

Description

Searching PyPI for ETL doesn't actually bring ETL Helper up as a result. This is because it doesn't recognise the substring etl in the name and because ETL doesn't appear in the project description.

https://pypi.org/search/?q=etl

The current description is:

etlhelper is a Python library to simplify data transfer into and out of databases.

We should update it with:

etlhelper is a Python ETL library to simplify data transfer into and out of databases.

The description is stored in setup.py

Acceptance criteria

  • Description in setup.py has been updated
  • Description in README matches
  • Description on GitHub website matches (may need project admin to change)

Make read_lob() available for fetchone, fetchmany and fetchall

Summary

As an ETL user, I want read_lob to be available to all fetching methods so that I can read Oracle LOBs in results.

Description

Oracle LOBs are used to store geospatial data. They have to be explicitly read to be able to use them. The

The keyword is currently only available for copy_rows but should be usable more widely.

Acceptance criteria

  • fetchone(..., read_lob=True) works
  • fetchmany(..., read_lob=True) works
  • fetchall(..., read_lob=True) works

Make execute return results of RETURNING clause in SQL

Summary

As an etl user inserting a single row, I want to access the autogenerated id of the new item so that I can use it later in my process.

Description

PostgreSQL, Oracle and SQL Server can return values generated by INSERT (and other commands), such as autogenerated primary key ids.

INSERT INTO my_table (message) VALUES ('hello') RETURNING id

These returned values can be accessed via cursor.fetchone()[0] on the cursor used to execute the command.
As etlhelper hides the cursor from the user, this is not possible.

At the moment, ETL Helper's execute returns None. We could modify it to return the the returned value instead, e.g. by calling returned_value = cursor.fetchall() and returning it at the end of the method.

cursor.execute(query, parameters)

Note that:

  • If there is no return value, then we should still return None as before (as opposed to []) or similar
  • Should we run the results through the namedtuple_row_factory for more convenient access?
  • We should check what happens for DELETE and UPDATE clauses, which can also do RETURNING
  • If it is possible that RETURNING returns lists of items, we should use cursor.fetchall() to ensure that we capture them all
  • This behaviour can be on by default because users have to explicitly add a RETURNING clause
  • executemany doesn't return anything, so we don't need to worry about that
  • If there isn't a one-size-fits-all solution, then a DbHelper.get_return_value(cursor) method may be required to implement database-specific solutions

Acceptance criteria

  • execute returns None or list of returned values when used with RETURNING (or equivalent)
  • Integration test for PostgreSQL
  • Integration test for Oracle
  • Integration test for MS SQL Server
  • README docs updated with explanation

Further reading

PostgreSQL RETURNING: https://www.postgresql.org/docs/current/sql-insert.html
Psycopg2 example: https://stackoverflow.com/questions/5247685/python-postgres-psycopg2-getting-id-of-row-just-inserted

Oracle RETURNING INTO: https://docs.oracle.com/cd/B28359_01/appdev.111/b28370/returninginto_clause.htm#LNPLS01354

MS SQL SERVER with pyodbc uses "OUTPUT" and only works if you read the value before committing the transaction. https://stackoverflow.com/a/60872000/3508733

SQLITE doesn't support "RETURNING". You can use SELECT last_insert_rowid() if you need to get the last ID, but this is beyond the scope of this method.
https://stackoverflow.com/questions/6242756/how-to-retrieve-inserted-id-after-inserting-row-in-sqlite-using-python

Warning message about missing psycopg2, even when not using Postgres

Summary

When using etlhelper in a new virtual environment without installing PostgreSQL dependencies, there are occasional error messages about missing psycopg2. These should be removed.

TODO: add steps to reproduce

Acceptance criteria

  • Missing psycopg2 messages are only displayed if attempting to connect to PostgreSQL.

Make setup_oracle_client fail gracefully if zip file is unavailable

Summary

As an etlhelper user, I want a helpful message if setup_oracle_client cannot unpack the zip file, so that I can understand what it wrong without digging through stack traces.

Description

The setup_oracle_client function called by the setup_oracle_client script crashes with a stack trace if there is a problem downloading or opening the zip file. This is unhelpful for users as the cause of failure may not be obvious. It would be better to fail gracefully.

This could be achieved by putting the call to _install_instantclient within a try... except block that catches the exception and calls sys.exit(1). If helpful failure messages are raised by the downloading/unzipping functions, their text could be logged after they are caught, just before exiting.

_install_instantclient(zip_location, install_dir,

Possible implementation

try:
    _install_instantclient(zip_location, install_dir, ld_library_prepend_script)
except (SomeError, AnotherError) as e:
    logger.error(e.args[0])
    sys.exit(1)

Acceptance criteria

  • Failure because the URL cannot be downloaded causes script to exit with helpful log message
  • Failure because file path doesn't exist causes script to exit with helpful log message
  • Failure because file is not a zip file causes script to exit with helpful log message
  • _download_zipfile docstring is updated
  • All failure modes are covered by integration tests

Add `select_sql_suffix` and `insert_sql_suffix` to copy_rows and load modules

Summary

As an ETLHelper user I want to be able to add WHERE clauses to copy_table_rows so that I have more flexibility while still not having to write full queries

Description

copy_table_rows runs a "SELECT * FROM table" query, and load runs an "INSERT INTO table (...) VALUES (...)" query. In the first case, it would be nice to be able to add a WHERE or ORDER BY or LIMIT to the query. In the second case it would be nice to add "ON CONFLICT DO NOTHING" or something similar.

Adding parameters for select_sql_suffix and execute_sql_suffix to the respective functions that appended whatever text was given would be a simple way to allow this.

Acceptance criteria

  • copy_table_rows accepts select_sql_suffix and execute_sql_suffix
  • load accepts execute_sql_suffix
  • README.md is updated to explain use cases described above

Investigate pyODBC fast_executemany option

Summary

We want to investigate the fast_executemany option for pyODBC cursors to see if it makes much difference and if it can be easily implemented.

Description

The pyODBC driver includes an option for fast_executemany.
It can be switched on by setting a flag on a cursor e.g.

cursor = conn.cursor()
cursor.fast_executemany = True
cursor.executemany(...)

We want to see if it makes a big difference, and if so, how it could be implemented.

Note that the fast_executemany flag is only recommended for Microsoft SQL Server ODBC driver and can modify behaviour of inserts. Therefore it should be optional and off by default (or should it?).

The implementation would involve either:

  • a custom executemany flag for mssql that takes an option, combined with a change in etl.executemany to accept and use the flag, and also in etl.copy_rows, or
  • a change in db_helpers.mssql.py to modify the way that cursors are created. For example, the db_helpers.sqlite.py class has a custom cursor method. The flag would need to be passed down from when the connection is created.

The first option would be simpler, but it would clutter the interface of the functions with options that are rarely used.

Acceptance criteria

This ticket is just to investigate whether the flag is worth using. The outputs is comments on this ticket that:

  • compare executemany performance with and without the flag being switched on (using normal DBAPI commands)
  • describe any other changes/differences that arise from using fast_executemany
  • have high-level description of each implementation option (including example commands in each style) and discusses pros and cons of each in more detail
  • discuss the pros/cons fixing it on (simpler implementation vs unexpected behaviour vs who are the users)

References

Add database network check to DbParams

Summary

Adding a is_reachable() method to DbParams would allow users to easily check if they can reach their target database over the network. This can be helpful in debugging situations.

Details

The method could be used as follows:

if not my_db.is_reachable():
    raise ETLHelperError("network problems")

It would not require any other parameters and can use Python's socket feature just to see if it is possible to connect to the required host and port. There is already a simple implementation of this in the testing code of etlhelper:

def db_is_unreachable(host, port):

Acceptance criteria

  • is_reachable() function added to DbParams class
  • conftest.py uses new method
  • README.md describes how to use.

Add __setattr__ to DbParams

Summary

We want DbParams to have attribute setting so that we can do MY_DB.user = 'me'.

Description

t.b.c.

Acceptance criteria

  • Can set valid attributes on DbParams instances using the dot notation to access member variables. e.g my_db.user = "FooBar"
  • Only valid attribute names can be set e.g. user, db_type, etc. Error raised for attempting to add arbitrary attributes(??)
  • db.is_reachable() has docstring (this was part of #33, but work could be done here while editing DbParams)

Configure logging

Description

Make all etlhelper modules use the same logger, with a single point of configuration. This will allow scripts using etlhelper to turn logging on and off easily.

By default, the logging setting should be set to logging.WARNING. When on logging.INFO, information on number of rows transferred should be printed. When on logging.DEBUG other information such as establishing of connections, DbParams used, SQL query text should be printed.

Implementation

Implementation may look like adding the following to etlhelper/__init__.py:

import logging

etlhelper_logger = logging.getLogger('etlhelper')
etlhelper_logger.setLevel(logging.WARNING)

and adding this to the modules:

import logging

logger = logging.getLogger('etlhelper')

Changing the level in an ETL script would be:

import logging
from etlhelper import etlhelper_logger

etlhelper_logger.setLevel(logging.DEBUG)

Acceptance criteria

  • All etlhelper modules use same logger
  • By default, logger only returns messages of WARNING or higher
  • ETL scripts can configure logging in one place
  • README.md describes how to use logger

Add describe_columns function

Summary

As an ETLHelper user, I would like to get metadata on a table e.g. column names and types so that I can programmatically generate SQL queries.

Description

Hard-coded SQL queries can be cumbersome in cases where tables have many columns. etlhelper provides load to programmatically generate insert queries and copy_table_rows programmatically copies the rows from SELECT * FROM table, however they only cover the simple cases.

A describe_columns(table, conn) function would return metadata about a database table, with column names and types as a minimum. These data can than be used to programmatically build a query. For example, a geospatial insert query could wrap the geometry column in ST_GeomFromText(%(geom)s, 4326) to convert WKT into a geometry column.

Implementation

The SQL query used to select the metadata is different for different database types. As such, it will need to be defined on the DbHelper classes. The queries will use the following internal tables / features:

The describe_columns(table, conn) function can live in a new module (utils). It should execute the SQL query from the DbHelper and return a list of Column named tuples with attributes for at least "name" and "type" (and possibly "is_primary_key", "not_null", "default_value", which are provided in SQLite).

The function should also raise an exception if the table name in ambiguous e.g. if the same table appears in different schemas. One way to detect this would be to detect non-unique column names. This means that the function should accept simple and schema-qualified names.

Acceptance criteria

Calling describe_columns(table, conn) against a table returns data for the following connection types:

  • SQLite
  • Oracle
  • PostgreSQL
  • MS SQL server

Bad column names raise exception:

  • SQLite
  • Oracle
  • PostgreSQL
  • MS SQL server

Schema qualified names can be used:

  • Oracle
  • PostgreSQL
  • MS SQL server

Additional criteria

  • describe_columns is available at top level etlhelper import
  • Subclass inheritance requires the query to be defined.

ModuleNotFoundError: No module named 'cx_Oracle' on `setup_oracle_client`

Behaviour

  • create a fresh virtual environment
  • pip install etlhelper
  • run setup_oracle_client

The following error is thrown: ModuleNotFoundError: No module named 'cx_Oracle'

Better behaviour

ETL Helper should catch this exception and print a nicer message explaining that cx_Oracle is required and that it can be installed via pip install etlhelper[oracle] or pip install cx_Oracle.

can't select COUNT(*) with default namedtuple rowfactory

Below is the traceback, with the Oracle driver. Looks like it is induced by supplying non-character field names derived from the cursor description to namedtuple() like this. I should make time to test with other backends / submit a PR, but dont have any today, so this issue is a placeholder/FYI. I can get past it for now by switching to the dict_rowfactory

namedtuple('Row', field_names=column_names)

get_rows("select count(*) from BGS.LXN_UNIT", db())

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-529dbe5276a5> in <module>
----> 1 get_rows("select count(*) from TABLENAME", db())

~/env/lib/python3.6/site-packages/etlhelper/etl.py in get_rows(select_query, conn, parameters, row_factory, transform)
    133     """
    134     return list(iter_rows(select_query, conn, row_factory=row_factory,
--> 135                           parameters=parameters, transform=transform))
    136 
    137 

~/env/lib/python3.6/site-packages/etlhelper/etl.py in iter_rows(select_query, conn, parameters, row_factory, transform, read_lob)
    115     for chunk in iter_chunks(select_query, conn, row_factory=row_factory,
    116                              parameters=parameters, transform=transform,
--> 117                              read_lob=read_lob):
    118         for row in chunk:
    119             yield row

~/env/lib/python3.6/site-packages/etlhelper/etl.py in iter_chunks(select_query, conn, parameters, row_factory, transform, read_lob)
     61 
     62         # Set row factory
---> 63         create_row = row_factory(cursor)
     64 
     65         # Parse results

~/env/lib/python3.6/site-packages/etlhelper/row_factories.py in namedtuple_rowfactory(cursor)
     17     column_names = [d[0] for d in cursor.description]
     18 
---> 19     Row = namedtuple('Row', field_names=column_names)
     20 
     21     def create_row(row):

/usr/local/lib/python3.6/collections/__init__.py in namedtuple(typename, field_names, verbose, rename, module)
    399         if not name.isidentifier():
    400             raise ValueError('Type names and field names must be valid '
--> 401                              'identifiers: %r' % name)
    402         if _iskeyword(name):
    403             raise ValueError('Type names and field names cannot be a '

ValueError: Type names and field names must be valid identifiers: 'COUNT(*)'

Add abort event for threaded operation

Summary

As an ETLHelper user I want a way to abort ETL processes when they are running in threads so that I can use ETLHelper within GUI applications.

Details

We recently had a case where an ETL Helper script was used in a Qt-based GUI application. The ETL work was done in a separate thread by a worker to prevent it freezing the GUI. However, this meant that there was no easy way to abort the running job. For a normal script you can just use CTRL-C.

Abort Event setup

Using a threading Event could allow users to send a signal to abort a running job. It could work as follows:

# etlhelper/abort.py
import threading

from etlhelper.exceptions import EtlHelperAbort

abort_event = threading.Event()

def abort():
    """Set an abort event."""
    abort_event.set()

def raise_for_abort(message):
    """Raise EtlHelperAbort exception with message if abort_event is set."""
    if abort_event.is_set():
        raise EtlHelperAbort(message)

Implementation

In etl.py, we just need to import the raise_for_abort method and call it at the start of the loops in iter_chunks and executemany. In the GUI application, it would be necessary to from etlhelper.abort import abort and to call abort() where you want the script to abort.

In iter_chunks, the row factory function could be modified to replace this:

create_row = row_factory(cursor)

with:

create_row = row_factory(cursor)
def create_row_or_abort(row):
    """Check for abort event before creating row."""
    raise_for_abort()
    return create_row(row)

Then replace create_row further down with create_row_or_abort.

For executemany, the check could happen when rows are read from the chunker by replacing this:

chunk = [row for row in chunk if row is not None]

with:

chunk = []
for row in dirty_chunk:
    raise_for_abort()
    if row is not None:
        chunk.append(row)

after renaming the raw output from the chunker as dirty_chunk.

Things to consider

  • what clean-up jobs are required after a task is aborted?
  • should we call raise_for_abort in the row factories and chunkers so that the abort is thrown after a row instead of after a chunk? This will mean converting some list comprehensions into full-fat loops.
  • are there speed implications?
  • how do you write tests for this (a custom transform with a time.sleep would slow things down to give time to run it)?
  • Will this pure Python implementation also work with QThreads?

Acceptance criteria

  • Calling abort during iter_chunks raises EtlHelperAbort exception
  • Calling abort during executemany raises EtlHelperAbort exception
  • Integration tests cover both scenarios

References

Note: these could make a nice recipe one day, based on BGS' Sigmalite importer code.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.