Giter Site home page Giter Site logo

asyncpgsa's Introduction

Documentation Status

asyncpgsa

A python library wrapper around asyncpg for use with sqlalchemy

Backwards incompatibility notice

Since this library is still in pre 1.0 world, the api might change. I will do my best to minimize changes, and any changes that get added, I will mention here. You should lock the version for production apps.

  1. 0.9.0 changed the dialect from psycopg2 to pypostgres. This should be mostly backwards compatible, but if you notice weird issues, this is why. You can now plug-in your own dialect using pg.init(..., dialect=my_dialect), or setting the dialect on the pool. See the top of the connection file for an example of creating a dialect. Please let me know if the change from psycopg2 to pypostgres broke you. If this happens enough, I might make psycopg2 the default.

  2. 0.18.0 Removes the Record Proxy objects that would wrap asyncpg's records. Now asyncpgsa just returns whatever asyncpg would return. This is a HUGE backwards incompatible change but most people just used record._data to get the object directly anyways. This means dot notation for columns is no longer possible and you need to access columns using exact names with dictionary notation.

  3. 0.18.0 Removed the insert method. We found this method was just confusing, and useless as SqlAlchemy can do it for you by defining your table with a primary key.

  4. 0.27.0 Now only compatible with version 0.22.0 and greater of asyncpg.

sqlalchemy ORM

Currently this repo does not support SA ORM, only SA Core.

As we at canopy do not use the ORM, if you would like to have ORM support feel free to PR it. You would need to create an "engine" interface, and that should be it. Then you can bind your sessions to the engine.

sqlalchemy Core

This repo supports sqlalchemy core. Go here for examples.

Docs

Go here for docs.

Examples

Go here for examples.

install

pip install asyncpgsa

Note: You should not have asyncpg in your requirements at all. This lib will pull down the correct version of asyncpg for you. If you have asyncpg in your requirements, you could get a version newer than this one supports.

Contributing

To contribute or build this locally see contributing.md

FAQ

Does SQLAlchemy integration defeat the point of using asyncpg as a backend (performance)?

I dont think so. asyncpgsa is written in a way where any query can be a string instead of an SA object, then you will get near asyncpg speeds, as no SA code is ran.

However, when running SA queries, comparing this to aiopg, it still seams to work faster. Here is a very basic timeit test comparing the two. https://gist.github.com/nhumrich/3470f075ae1d868f663b162d01a07838

aiopg.sa: 9.541276566000306
asyncpsa: 6.747777451004367

So, seems like its still faster using asyncpg, or in otherwords, this library doesnt add any overhead that is not in aiopg.sa.

Versioning

This software follows Semantic Versioning.

asyncpgsa's People

Contributors

achimnol avatar alairock avatar alvassin avatar amatanhead avatar arusahni avatar carsonyl avatar danpozmanter avatar denisdubovitskiy avatar disconnect3d avatar fantix avatar gr1n avatar hardtack avatar hatarist avatar iamrajhans avatar jaawerth avatar kamikaze avatar matemax avatar mattrasband avatar mosquito avatar nhumrich avatar nikitagromov avatar okanakbulut avatar omarryhan avatar petedmarsh avatar rlittlefield avatar skuda avatar stachlewski avatar stranger6667 avatar thedrow avatar vmagamedov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

asyncpgsa's Issues

`pg.insert` lost all arguments

pg.insert simply doesn't work:

Traceback (most recent call last):
  File "test.py", line 19, in <module>
    loop.run_until_complete(main())
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
    return future.result()
  File "test.py", line 6, in main
    val = await pg.insert('INSERT INTO users (nickname) VALUES ($1)', 'fantix')
  File "/Volumes/Home/fantix/py36env/lib/python3.6/site-packages/asyncpgsa/pgsingleton.py", line 91, in insert
    timeout=timeout)
  File "/Volumes/Home/fantix/py36env/lib/python3.6/site-packages/asyncpgsa/connection.py", line 138, in insert
    return await self.fetchval(query, *params, *args, **kwargs)
  File "/Volumes/Home/fantix/py36env/lib/python3.6/site-packages/asyncpgsa/connection.py", line 123, in fetchval
    return await super().fetchval(query, *args, **kwargs)
  File "/Volumes/Home/fantix/py36env/lib/python3.6/site-packages/asyncpg/connection.py", line 358, in fetchval
    data = await self._execute(query, args, 1, timeout)
  File "/Volumes/Home/fantix/py36env/lib/python3.6/site-packages/asyncpg/connection.py", line 1188, in _execute
    return await self._do_execute(query, executor, timeout)
  File "/Volumes/Home/fantix/py36env/lib/python3.6/site-packages/asyncpg/connection.py", line 1209, in _do_execute
    result = await executor(stmt, None)
  File "asyncpg/protocol/protocol.pyx", line 181, in bind_execute (asyncpg/protocol/protocol.c:66715)
  File "asyncpg/protocol/prepared_stmt.pyx", line 103, in asyncpg.protocol.protocol.PreparedStatementState._encode_bind_msg (asyncpg/protocol/protocol.c:62407)
ValueError: number of arguments (0) does not match number of parameters (1)

Looks like the overriden _execute method simply ignored the given args but only make use of the args found in the query object - if query is one:

    def _execute(self, query, args, limit, timeout, return_status=False):
        query, args = compile_query(query, dialect=self._dialect)
        return super()._execute(query, args, limit, timeout,
                                return_status=return_status)

The same happens with insert when using a SQLAlchemy core insert statement object with values included. Because the compile is done in insert, before _execute.

Benchmark

The only reason I see to use asyncpg is performance benefits. Do you have any benchmarks for comparsion your lib with aiopg? It's not obvious for me that alchemy layer don't diminish any asyncpg advantages.

Cursors are not supported(?) with the pool.transaction() interface

I have declared a connection pool and tried to call cursor like so:

pool = asyncpgsa.create_pool()
async with pool.transaction() as t:
  async with t.query("SELECT * FROM whatever") as cursor:
    # ...

The query() method is currently missing.
It might be a documentation problem though.

SAConnection not supporting prepare statement method

Hi,

we find the library very handy above all the fact that automagically compiles the queries, however we noticed (and it's kinda annoying) that connection from pool is not supporting prepare method. I think it is like 5 lines of code inside SAConnection at most, and would keep the library consistent and not having to do when preparing statements compile_queryby hand.

Record subscript operator

I am pretty new to SQL Alchemy, this makes me thing I am missing something and that there is a good reason to keep things as they are now so before create a pull request to add this I need to ask:

Why the Record type doesn't handle subscript operator []?

row.type_id
1
row['type_id']
Traceback (most recent call last):
  Debug Probe, prompt 53, line 1
builtins.TypeError: 'Record' object is not subscriptable

It's pretty easy to add to the record class the method

def __getitem__(self, key):
        if isinstance(key, Column):
            key = key.name

        return self.__getattr__(key)

So we can do:

row.type_id
1
row['type_id']
1
row[models.t_game_server_type.c.type_id]
1

Could this create any problem? I would like to be as close as possible to the SQLAlchemy Core syntax.

Thanks!

Tag releases and add changelog

Please tag releases/versions and add a changelog, to make it easier to follow the development of this library. Thanks!

Use of modulo operator compiles a query unrecognized by asyncpg

When a statement that uses the modulo operator is compiled, the default psycopg2 dialect query compiler (PGCompiler_psycopg2) used by asyncpgsa has some sort of special treatment of percent signs that results in the compiled query having %% instead of %, which isn't recognized by asyncpg.

For example, select([literal_column('10') % 2]), results in this logged query: SELECT 10 %% $1 AS anon_1. This gets rejected by asyncpg: asyncpg.exceptions.UndefinedFunctionError: operator does not exist: integer %% unknown.

I noticed that the pypostgresql dialect doesn't deviate from the default query compiler, so I changed connection.py to use that dialect instead. It generates a query with only one % as expected, which made asyncpg happy. But I don't know the implications of doing this, and didn't go further than that.

Error on pg.init

On trying to invoke pg.init, the module throws this cryptic error. I have been looking for this init required keyword in the documentation but could not find any thing. Am I missing something? Please help.

The code that triggered it:

import asyncio
from asyncpgsa import pg

if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete( pg.init( dsn="postgres:///tranql_bot" ) )

And error:

Traceback (most recent call last):
File "/usr/lib/python3.5/runpy.py", line 193, in _run_module_as_main
"main", mod_spec)
File "/usr/lib/python3.5/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/home/diwank/github.com/creatorrr/tranql-bot/app/main.py", line 12, in
loop.run_until_complete( pg.init( dsn="postgres:///tranql_bot" ) )
File "/usr/lib/python3.5/asyncio/base_events.py", line 457, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 292, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/diwank/.envs/tranql/lib/python3.5/site-packages/asyncpgsa/pgsingleton.py", line 33, in init
self.__pool = await create_pool(*args, **kwargs)
File "/home/diwank/.envs/tranql/lib/python3.5/site-packages/asyncpgsa/pool.py", line 75, in create_pool
**connect_kwargs)
TypeError: init() missing 1 required keyword-only argument: 'init'

process_result_value callback for column type is not handled

I need TypeDecorator for my data column, it is handled by SQLAlchemy + psycopg2 correctly. Is it possible to make it work with asyncpgsa?

import asyncio
from datetime import datetime

from asyncpgsa import PG
from pytz import timezone
from sqlalchemy import (
    Column, DateTime, Integer, MetaData, Table, TypeDecorator, create_engine
)


DB_URL = 'postgresql://user:[email protected]/db'


class DateTime_(TypeDecorator):
    impl = DateTime

    def __init__(self):
        TypeDecorator.__init__(self, timezone=True)

    def process_bind_param(self, value, dialect):
        if value is not None:
            return datetime.fromtimestamp(value, timezone('UTC'))

    def process_result_value(self, value, dialect):
        return int(value.timestamp())


metadata = MetaData()
example_table = Table('example', metadata,
                      Column('id', Integer, primary_key=True),
                      Column('some_date', DateTime_))

engine = create_engine(DB_URL)

# Create table & add row
metadata.create_all(engine)
engine.execute(example_table.insert().values({
    'some_date': int(datetime.now().timestamp())
}))

# psycopg2 with sqlalchemy handles process_result_value correctly
rows = engine.execute(example_table.select()).fetchall()
assert isinstance(rows[0]['some_date'], int)


# asyncpgsa does not handle process_result_value callback
async def main():
    db = PG()
    await db.init(DB_URL)
    rows = await db.fetch(example_table.select())
    assert isinstance(rows[0]['some_date'], datetime)  # True
    assert isinstance(rows[0]['some_date'], int)  # False!


asyncio.run(main())

Perhaps such callbacks should be called in SAConnection.execute?

Suggestion: Update create_pool() for forward compatibility with asyncpg

Recently asyncpg has added new constructor arguments to its Pool class, such as max_inactive_connection_lifetime.
This has broken my code that uses asyncpgsa, not directly asyncpg, because asyncpgsa.Pool class directly subclasses asyncpg.Pool and those new arguments are mandatory.

To remedy such forward-compatibility problems, asyncpg provides a factory method which sets default values for newly introduced arguments.

I think it will be nice to have the same factory/wrapper method to create connection pools in asyncpgsa as well.

Using asyncpgsa with alembic in `env.py`

I'm working on a new project and I thought I'd try out asyncpgsa. I'm using alembic to manage DB migrations, but as I have things set up right now it needs psycopg2 in order to run - it's not a big problem but I feel a bit silly having both asyncpgsa and psycopg2 in my project as dependencies.

I saw that you mentioned using alembic in this issue:

#37 (comment)

I wondered if you used alembic with asyncpgsa and, if so, could you share your alembic env.py?

Any convenient way to get the column names from result?

I want to turn the result into pandas.DataFrame, so I need an easy way to get the values as list of list and keys as list
currently I can only do

result = await conn.fetch(expression)
df = pd.DataFrame(list(dict(i) for i in result))

which seems not very efficient, I would like something

result = await conn.fetch(expression)
df = pd.DataFrame(result.values(), columns=result.keys())

to avoid turning into a dict for every record

thx

Cannot compile DropTable and CreateTable queries

Hello!

I try to create table via this code:

import asyncio

from asyncpgsa import pg
import sqlalchemy as sa
from sqlalchemy.sql.ddl import CreateTable, DropTable

users = sa.Table(
    'users', sa.MetaData(),
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.VARCHAR(255)),
)


async def main():
    await pg.init('postgresql://localhost/test')
    await pg.query(DropTable(users))
    await pg.query(CreateTable(users))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

and got an error from asyncpgsa

Traceback (most recent call last):
  File "/project/main.py", line 20, in <module>
    loop.run_until_complete(main())
  File "/project/.venv/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "/project/main.py", line 15, in main
    await pg.query(DropTable(users))
  File "/project/.venv/lib/python3.6/site-packages/asyncpgsa/pgsingleton.py", line 65, in query
    compiled_q, compiled_args = compile_query(query)
  File "/project/.venv/lib/python3.6/site-packages/asyncpgsa/connection.py", line 60, in compile_query
    compiled_params = sorted(compiled.params.items())
AttributeError: 'NoneType' object has no attribute 'items'

Same code using sqlalchemy works fine:

import sqlalchemy as sa
from sqlalchemy.sql.ddl import CreateTable, DropTable

users = sa.Table(
    'users', sa.MetaData(),
    sa.Column('id', sa.Integer, primary_key=True),
    sa.Column('name', sa.VARCHAR(255)),
)

engine = sa.create_engine('postgresql://localhost/test')
engine.execute(DropTable(users))
engine.execute(CreateTable(users))

Thanks!

Something wrong with handling default column values

Here: https://github.com/CanopyTax/asyncpgsa/blob/master/asyncpgsa/connection.py#L70

    query.parameters = query.parameters or {}
    for col in query.table.columns:
        attr = getattr(col, attr_name)
        if attr and query.parameters.get(col.name) is None:
            if attr.is_scalar:
                query.parameters[col.name] = attr.arg
            elif col.default.is_callable:
                query.parameters[col.name] = attr.arg({})

Update raises an exception AttributeError: 'NoneType' object has no attribute 'is_callable'

Model is:

@compiles(UTCNow, 'postgresql')
def pg_utcnow(element, compiler, **kw):
    return "TIMEZONE('utc', CURRENT_TIMESTAMP)"


class MyModel:
    id = Column(Integer(), primary_key=True)
    created_at = Column(DateTime, server_default=UTCNow())
    updated_at = Column(DateTime, onupdate=UTCNow())

class MyOrg(MyModel, Base):
    __tablename__ = 'org'

    name = Column(String(100), nullable=False)

Failure happened for updated_at

JSON field is string

Having field defined this way Column('abc', JSON), I have to use json.loads to get dict:

rows = await pg.fetch(sql)
async for row in rows:
    print(json.loads(row.abc))

row.abc is string

bindparam does not work

Perhaps you could advice some fast fix or workaround for that? I need update for many different rows with different values. Previously used bindparam for that.

import asyncio

from asyncpgsa import PG
from sqlalchemy import Table, MetaData, Column, Integer, String, bindparam, \
    create_engine

metadata = MetaData()

table = Table(
    'citizens',
    metadata,
    Column('test_id', Integer, primary_key=True),
    Column('name', String, nullable=False),
)

DB_URL = 'postgresql://user:[email protected]/db'


async def main():
    # create table
    engine = create_engine(DB_URL)
    metadata.create_all(engine)

    # connect to db
    pg = PG()
    await pg.init(DB_URL)
    async with pg.transaction() as conn:
        # create
        query = table.insert().values([
            {'name': str(i)} for i in range(10)
        ]).returning(table)
        rows = await conn.fetch(query)

        # update
        query = table.update().values(name=bindparam('name'))
        await conn.execute(query, [
            {'test_id': row['test_id'], 'name': row['name'] + '_new'}
            for row in rows
        ])

        # check
        # asyncpg.exceptions.NotNullViolationError: null value in column "name" violates not-null constraint
        # DETAIL:  Failing row contains (31, null).
        results = await conn.execute(table.select())
        print(results)

asyncio.run(main())

How to create the database using sqlalchemy's create_all ?

I found nowhere an example to create the database. Here is one of my tries

import asyncio
import os
import sys

import asyncpgsa
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL

from model import Base
from utils import read_configuration_file


async def main(config, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    db_connection_infos = {
        "drivername": "postgres",
        "host": config["host"],
        "port": config["port"],
        "username": config["username"],
        "password": config["password"],
        "database": config["database"]
    }
    dsn = str(URL(**db_connection_infos))

    db_engine = await create_engine(dsn, echo=True, module=asyncpgsa)
    try:
        Base.metadata.create_all(db_engine)
    except:
        sys.stderr.write("cannot connect to database\n")
        sys.exit(2)
    finally:
        db_engine.dispose()


if __name__ == "__main__":
    config = read_configuration_file()
    if not config:
        sys.exit(1)
    config = config["database"]
    config["password"] = os.getenv("PG_PASS", "") or config["password"]

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(config))

The error is :

Traceback (most recent call last):
  File "create-schema.py", line 46, in <module>
    loop.run_until_complete(main(config))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "create-schema.py", line 28, in main
    db_engine = await create_engine(dsn, echo=True, module=asyncpg)
  File "/home/franck/projets/python/my-own-little-business/venv/lib/python3.5/site-packages/sqlalchemy/engine/__init__.py", line 387, in create_engine
    return strategy.create(*args, **kwargs)
  File "/home/franck/projets/python/my-own-little-business/venv/lib/python3.5/site-packages/sqlalchemy/engine/strategies.py", line 88, in create
    dialect = dialect_cls(**dialect_args)
  File "/home/franck/projets/python/my-own-little-business/venv/lib/python3.5/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 521, in __init__
    PGDialect.__init__(self, **kwargs)
  File "/home/franck/projets/python/my-own-little-business/venv/lib/python3.5/site-packages/sqlalchemy/dialects/postgresql/base.py", line 2083, in __init__
    default.DefaultDialect.__init__(self, **kwargs)
  File "/home/franck/projets/python/my-own-little-business/venv/lib/python3.5/site-packages/sqlalchemy/engine/default.py", line 196, in __init__
    self.paramstyle = self.dbapi.paramstyle
AttributeError: module 'asyncpg' has no attribute 'paramstyle'

I also tried to use the creator param of create_engine. Not better, the function beeing a coroutine, I fear sqlalchemy does not like this...

`conn.execute(query)` is not working with params

From version 0.14.0 conn.execute() is not working.

Simple example:

async with app.pool.acquire() as conn:
    query = table.update(table.c.some_culumn=123).where(table.c.id=1)
    await conn.execute(query)

Will go asyncpgsa/connection.py :


async def execute(self, script, *args, **kwargs) -> str:
        # script, params = compile_query(script, dialect=self._dialect)
        result = await super().execute(script, *args, **kwargs)
        return RecordGenerator(result)

And there *args will be empty tuple, hence when it would go to super().execute(script, *args, **kwargs) which is asyncpg.Connection.execute() :

async def execute(self, query: str, *args, timeout: float=None) -> str:
    ...
    if not args:  # <- WILL ENTER HERE
            return await self._protocol.query(query, timeout)

     _, status, _ = await self._execute(query, args, 0, timeout, True)
    return status.decode()

And if i will use version 0.13.0 everything will be alright, because i will pass *params and *args to asyncpg.Connection.execute() and then it will skip if not args clause in asyncpg.Connectuon.execute:

async def execute(self, script, *args, **kwargs) -> str:
    script, params = compile_query(script, dialect=self._dialect)
    result = await self._connection.execute(script, *params, *args, **kwargs)
    return RecordGenerator(result)

I am certainly grateful for that awesome package, and i am willing to help, but i not sure how, because recent change to omit compile_query in execute statement is made for a reason. And i don't really understand what reason it is.

For a moment i rolled back to 0.13.0 and everything works as expected.

Record proxy with type result processors

Regarding #42 (which applies the type bind processors when adding parameters to the queries), I found that the reverse direction is not working yet โ€” the type result processors (both for user-defined type decroators and dialect-specific types) are not invoked when reading the fetched rows.

I tried to fix this by myself, but it seems to be much more complicated than I first thought:

  • It requires major rewriting of the current SAConnection class so that compiled._result_columns are passed around. This is not trivial because we currently subclasses asyncpg.connection.Connection directly and extends its API by overrides, e.g., we need to know which methods are invoked by who (either the base class or our class).
  • Record and RecordGenerator classes should have reference to both the compiled._result_columns and _dialect.dbapi_type_map. So Record is no longer just a thin wrapper of the given "row" dict but an active post-processor of row fields, like in aiopg.
  • Whenever accessing the row values, we need to lookup and apply result processors based on the above two maps.

For custom types, maybe I need to fall back to aiopg for more complete SA functionality due to my time limits. It is unfortunate not to have public/documented SQLAlchemy APIs to make low-level adaptor libraries like asyncpgsa and aiopg. ๐Ÿ˜ข

Please use minimum version specifier (>=), not compatible specifier (~=)

I'm suffering with the bug of asyncpg (MagicStack/asyncpg#158) which is already fixed in v0.13.0.

But since asyncpgsa forces to use compatible version of v0.12.0, I'm still suffering with the bug.
Please use minimum version specifier in setup.py like this

#!/usr/bin/env python3
from setuptools import setup

setup(
    name='asyncpgsa',
    version=__import__('asyncpgsa').__version__,
    install_requires=[
        'asyncpg >=0.12.0, <1.0',
        'sqlalchemy',
    ],
    packages=['asyncpgsa', 'asyncpgsa.testing'],
    url='https://github.com/canopytax/asyncpgsa',
    license='Apache 2.0',
    author='nhumrich',
    author_email='[email protected]',
    description='sqlalchemy support for asyncpg'
)

Unable to insert into a table with JSONB

Hi,

Im facing some problems when I insert a JSONB into a table, I get this error:

following exception:  'StrCompileDialect' object has no attribute '_json_serializer'
Traceback (most recent call last):
  File "/Users/alejandrorosas/.pyenv/versions/raven/lib/python3.6/site-packages/sqlalchemy/sql/type_api.py", line 456, in _cached_bind_processor
    return dialect._type_memos[self]['bind']
  File "/Users/alejandrorosas/.pyenv/versions/3.6.2/lib/python3.6/weakref.py", line 394, in __getitem__
    return self.data[ref(key)]
KeyError: <weakref at 0x10d9c54a8; to 'JSONB' at 0x10d91f2e8>

In the 0.14.2 version it was working correcly, maybe there's an issue with the latest version.

Thanks!

`ConnectionTransactionContextManager` slowly drains the pool

We are using transaction as follows

async with pool.transaction() as conn:
    ...

and from time to time (in our use case it ranges from once a day to once a week) our service freezes. When we investigated the issue we noticed, that all connections in the pool were marked as used and no free connection was available. We traced the problem to the ConnectionTransactionContextManager.

If cancellation or timeout raises during __aenter__ or __aexit__ there is no guarantee that connection is returned to the pool. And it slowly drains. We confirmed that this is the issue, because we change code to this

async with pool.acquire() as conn:
    async with conn.transaction():
        ....

and our problem stops.

I can create PR to fix this if you like.

Defaults are not handled in multirow inserts

execute_defaults assumes the query.parameters can't be a list, which is the case in a multirow insert like:
q = table.insert().values([
{'val': 12},
{'val': 13},
])
conn.execute(q)

'PGDDLCompiler' object has no attribute '_bind_processors'

My envs

- Python 3.6.3
- asyncpgsa==0.18.1
- asyncpg==0.12.0
- sqlalchemy==1.1.15

My code sample

from asyncpgsa import PG
from app import app

DB_CONFIG = {
    'host': '172.17.0.3',
    'port': 5432,
    'database': '***',
    'user': 'xtpids',
    'password': '***',
    'min_size': 5,
    'max_size': 10,
}

@app.listener('before_server_start')
async def init_db(*args, **kwargs):
    # Initializing a db singleton before server start
    pg = PG()
    await pg.init(**DB_CONFIG)
    app.db = pg
    return app.db
# Create tables in some script
tables = Base.metadata.tables
for name, table in tables.items():
    create_expr = CreateTable(table)
    await app.db.execute(create_expr)

The exception

...
File "/home/wonder/PyEnvs/xtpids-BKbQCeJj/lib/python3.6/site-packages/asyncpgsa/pgsingleton.py", line 82, in execute
return await conn.execute(*args, **kwargs)
File "/home/wonder/PyEnvs/xtpids-BKbQCeJj/lib/python3.6/site-packages/asyncpgsa/connection.py", line 105, in execute
script, params = compile_query(script, dialect=self._dialect)
File "/home/wonder/PyEnvs/xtpids-BKbQCeJj/lib/python3.6/site-packages/asyncpgsa/connection.py", line 83, in compile_query
params = _get_keys(compiled)
File "/home/wonder/PyEnvs/xtpids-BKbQCeJj/lib/python3.6/site-packages/asyncpgsa/connection.py", line 43, in _get_keys
processors = compiled._bind_processors
AttributeError: 'PGDDLCompiler' object has no attribute '_bind_processors'

My analysis

In sqlalchemy's source, only SQLCompiler has _bind_processors property.
However create_expr.compile(dialect=asyncpgsa.connection._dialect) generates an instance of type PGDDLCompiler(DDLCompiler), of which SQLCompiler isn't a base class.

Do we need Pool.close()?

I'm heavily using asyncpgsa in this time, and I came up with one question.

If I want to close all connections from Pool, can I use Pool.close() from asyncpg's Pool method?

For example, there may be a situation that I want to start and define asyncpgsa singleton object when server starts, and I want to gracefully release Pool connection when server ends.

I will try to add PR with adding this functionality.

Support Sequence

HI, i am using Sequence in my models.

SEQUENCE = Sequence('my_sequence', start=1)

class MyTabel(Base):

    __tablename__ = 'tabel_1'
    Base.metadata = metadata

    id_2 = Column(String(36), ForeignKey('tabel_2.id_2 ', ondelete='CASCADE'), primary_key=True)

    id_1 = Column(String(36), ForeignKey('tabel_2.id_1 ', ondelete='CASCADE'), primary_key=True)

    seq_id= Column(Integer,  SEQUENCE )

And when i try insert into the table with sequence a new value i have following error:

  File "C:\Program Files\Python37\lib\site-packages\asyncpg\connection.py", line 433, in fetchval
    data = await self._execute(query, args, 1, timeout)
  File "C:\Program Files\Python37\lib\site-packages\asyncpgsa\connection.py", line 89, in _execute
    query, compiled_args = compile_query(query, dialect=self._dialect)
  File "C:\Program Files\Python37\lib\site-packages\asyncpgsa\connection.py", line 64, in compile_query
    query = execute_defaults(query)  # default values for Insert/Update
  File "C:\Program Files\Python37\lib\site-packages\asyncpgsa\connection.py", line 40, in execute_defaults
    _execute_default_attr(query, query.parameters, attr_name)
  File "C:\Program Files\Python37\lib\site-packages\asyncpgsa\connection.py", line 48, in _execute_default_attr
    if attr.is_scalar:
AttributeError: 'Sequence' object has no attribute 'is_scalar'

Following change fix it in my case. But i am not sure that is correct.

# old
def _execute_default_attr(query, param, attr_name):
    for col in query.table.columns:
        attr = getattr(col, attr_name)
        if attr and param.get(col.name) is None:
            if attr.is_scalar:
                param[col.name] = attr.arg
            elif attr.is_callable:
                param[col.name] = attr.arg({})

# new
def _execute_default_attr(query, param, attr_name):
    for col in query.table.columns:
        attr = getattr(col, attr_name)
        if attr and param.get(col.name) is None:
            if attr.is_sequence:
                continue
            if attr.is_scalar:
                param[col.name] = attr.arg
            elif attr.is_callable:
                param[col.name] = attr.arg({})

asyncpgsa - Version: 0.25.2
sqlalchemy - Version: 1.2.15

Typings?

Hi,

I was wondering if there were/are typings for the library? I'm trying to reduce the number of mypy exceptions I carve out, and this is one of the ones that mypy yells at me for.

Thanks for the great library!

None result should be None (fetchrow and fetch)

query = ...  # some SA select query
row = await conn.fetchrow(query)  # conn is SAConnection

When the underlying asyncpg connection returns None as the "empty" result, row should become None as well.
Currently, row is not None but row.row is None.
This would confuse the users who expect the API semantics to be same to asyncpg.

The same applies to fetch and execute which returns a RecordGenerator because it does not check None when instantiating Record objects.

Q: What's the purpose of having seem-to-be-redundant Record and RecordGenerator classes?
They look like a no-op proxy to asyncpg's return objects.

Incorrect processing of intervals (aka timedelta)

Hello!

So in MagicStack/asyncpg#181 I've reported that there is a bug with processing timedelta instances.

In short, any query that works with intervals fails:

q = sqlalchemy \
    .select([sqlalchemy.func.count()]) \
    .select_from(TASK) \
    .where(sqlalchemy.func.NOW() - TASK.c.last_update > timedelta(seconds=10))

async with app.connection() as c:
    print(await c.fetchval(q))

Turns out that this bug happens because _dialect on SAConnection is actually None when it should be pypostgresql.dialect(). Somehow, __init__ of the SAConnection is commented (see 8c02eb1) and I don't see any reason why. Does anyone know anything about it? Was there any reason to switch to the default dialect?

I'll (try) to send PR to fix this (maybe with some tests added).

Dependency on old version of asyncpg

When using asyncpgsa it's hard to use latest version of asyncpg, because there's asyncpg~=0.12.0 in install_requires.
What's more, I'd like to use asyncpgsa only as sql query compiler, without using its context managers (as shown here). In this case asyncpg is not a asyncpgsa's dependency any more in practice. It's just a query compiler.
How about moving asyncpg to extras_require? Or splitting the library into 2 separate packages (compiler and asyncpg adapter (any better name?)).

Register asyncpgsa dialect in SQLAlchemy?

I found that registering the dialect (+install py-postgresql dependency) automatically allows us to use alchemy ORM.

from sqlalchemy.dialects import registry

# this line makes asyncpgsa to be the psycopgsa:// wrapper
registry.register("psycopgsa", "asyncpgsa.connection", "_dialect")

Also dbapi needs to be override

Version

Shouldn't the version.py file indicate version 0.23.0?

Asyncpg connection are not returning to a pool

An Asyncpg connection will not return to a pool connection if in the method aenter was raised exception agter acquire_context.

 async def __aenter__(self):
        self.acquire_context = self.pool.acquire(timeout=self.timeout)
        con = await self.acquire_context.__aenter__()
        self.transaction = con.transaction(**self.trans_kwargs)
        await self.transaction.__aenter__()
        return con

Cancelled error may be raised acquire_context in await of transaction.__aenter__. And Connectio return to the pool never.

Add an example involving range types

I have a table like the following:

CREATE TABLE users (
  id SERIAL,
  name VARCHAR(15),
  active DATERANGE,

  PRIMARY KEY (id)
)

that with plain SQLAlchemy+psycopg2 I can query with:

    today = date.today()
    q = select([users.c.id]) \
        .where(users.c.name == 'test') \
        .where(users.c.active.contains(today))
    r = session.execute(q)

With the following asyncpgsa transliteration:

    today = date.today()
    q = sa.select([users.c.id]) \
        .where(users.c.name == 'test') \
        .where(users.c.active.contains(today))
    r = await pg.fetchrow(q)

I get an error:

/usr/local/lib/python3.5/site-packages/asyncpgsa/pgsingleton.py:65: in fetchrow
    return await conn.fetchrow(query, *args, timeout=timeout)
/usr/local/lib/python3.5/site-packages/asyncpgsa/connection.py:91: in fetchrow
    result = await self.connection.fetchrow(query, *params, *args, **kwargs)
/usr/local/lib/python3.5/site-packages/asyncpg/connection.py:259: in fetchrow
    False, timeout)
asyncpg/protocol/protocol.pyx:157: in bind_execute (asyncpg/protocol/protocol.c:45856)
    ???
asyncpg/protocol/prepared_stmt.pyx:122: in asyncpg.protocol.protocol.PreparedStatementState._encode_bind_msg (asyncpg/protocol/protocol.c:42239)
    ???
asyncpg/protocol/codecs/base.pyx:123: in asyncpg.protocol.protocol.Codec.encode (asyncpg/protocol/protocol.c:12276)
    ???
asyncpg/protocol/codecs/base.pyx:90: in asyncpg.protocol.protocol.Codec.encode_range (asyncpg/protocol/protocol.c:11868)
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   ???
E   TypeError: list, tuple or Range object expected (got type <class 'datetime.date'>)

asyncpg/protocol/codecs/range.pyx:53: TypeError

which seems odd to me: what I want is to test whether a single date is within a period...

I tried the following code, passing a one-item-tuple instead:

    today = date.today()
    q = sa.select([users.c.id]) \
        .where(users.c.name == 'test') \
        .where(users.c.active.contains((today,)))
    r = await pg.fetchrow(q)

that works, but that seems to convert the argument to a [today, +infinity] range, something different from what I'm seeking.

What am I missing?

pip failed to install package

pip can't install asyncpgsa if it's part of requirements list alongside with asyncpg (in other words it requires asyncpg to be installed before installing asyncpgsa). Is there a way to change version determination?

Collecting asyncpg==0.12.0 (from -r .meta/packages (line 5))
....
Collecting asyncpgsa==0.18.1 (from -r .meta/packages (line 6))
Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-build-gcdgh8ov/asyncpgsa/setup.py", line 6, in <module>
        version=__import__('asyncpgsa').__version__,
      File "/tmp/pip-build-gcdgh8ov/asyncpgsa/asyncpgsa/__init__.py", line 1, in <module>
        from .pool import create_pool
      File "/tmp/pip-build-gcdgh8ov/asyncpgsa/asyncpgsa/pool.py", line 3, in <module>
        import asyncpg
    ModuleNotFoundError: No module named 'asyncpg'

Connect Error When using dsn and password contains '#'

when i use pool and my db password is "dY8*6fN6Z#xSOg$wG9zDATTe"
pool = await pg.create_pool(dsn)
raise an error

hostlist_ports.append(int(hostspec_port))
ValueError: invalid literal for int() with base 10: 'dY8*6fN6Z'

Question about nested transaction (savepoint)

Hello, I'm trying to use nested transaction in pytest fixture to achieve auto rollback in test environment. And I notice asyncpg support savepiont, but I just could not make it.

The following is code snippet:

class ExplilcitRollbackException(Exception):
    pass


@pytest.fixture
async def pg_pool(loop, pg_setting, setup_databse):
    from asyncpgsa import create_pool
    pool = await create_pool(**pg_setting, loop=loop)

    try:
        async with pool.transaction():
            yield pool
            raise ExplilcitRollbackException
    except ExplilcitRollbackException:
        pass
    finally:
        await pool.close()

Are there any ways to achieve this, or I could only truncate the table?

Issues casting a parameter

Running the following query results in a traceback. Seems it cannot handle casting parameters.

sql = '''
    SELECT date::date 
    FROM generate_series(:start_month::date, :end_month::date, '1 month'::interval)
'''
now = datetime.datetime.now()
start_month = datetime.datetime(now.year, now.month, 1)
end_month = datetime.datetime(now.year, now.month + 1, 1)
query = sa.text(sql).params(start_month=start_month, end_month=end_month)
await conn.fetch(query)
Traceback (most recent call last):
...
  File "/python3.6/site-packages/asyncpgsa/connection.py", line 99, in _execute
    query, compiled_args = compile_query(query, dialect=self._dialect)
  File "/python3.6/site-packages/asyncpgsa/connection.py", line 83, in compile_query
    params = _get_keys(compiled)
  File "/python3.6/site-packages/asyncpgsa/connection.py", line 52, in _get_keys
    raise MissingParameterError('Parameter {} missing'.format(e))
asyncpgsa.connection.MissingParameterError: "Parameter 'start_month' missing"

Func with string params, IndeterminateDatatypeError

Hi!
Thank you for asyncpgsa.

I have some problems when use sql.func with sting arguments.
I try to use func.jsonb_build_object('key_name', field.key) construction.

Actually, the simple example to reproduce this error is :

query = select([func.count('*')]).select_from(table)
records = await conn.fetch(query)

  File "asyncpg/protocol/protocol.pyx", line 163, in prepare
asyncpg.exceptions.IndeterminateDatatypeError: could not determine data type of parameter $1

TypeError: object of type 'Select' has no len()

Well, I take this as example

import asyncio
import asyncpgsa

import sqlalchemy as sa
pg_tables = pg_tables = sa.Table(
        'pg_tables', sa.MetaData(),
        sa.Column('schemaname'),
        sa.Column('tablename'),
        sa.Column('tableowner'),
        sa.Column('tablespace'),
        sa.Column('hasindexes')
    )

async def main():
    pool = await asyncpgsa.create_pool(
        host=?,
        port=?,
        database=?,
        user=?,
        # loop=event_loop,
        password="5800149687",
        min_size=5,
        max_size=10,
        max_inactive_connection_lifetime=60, # What is it? Why its mandatory?
    )
    query = pg_tables.select().where(pg_tables.c.schemaname == 'pg_catalog')
    async with pool.acquire() as conn:
        for row in await conn.fetch(query):
            a = row.col_name


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

and get this:

Traceback (most recent call last):
  File "test.py", line 37, in <module>
    loop.run_until_complete(main())
  File "/home/valentin/.pyenv/versions/3.6.1/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
    return future.result()
  File "test.py", line 32, in main
    for row in await conn.fetch(query):
  File "/home/valentin/.pyenv/versions/tesla-much/lib/python3.6/site-packages/asyncpg/connection.py", line 340, in fetch
    return await self._execute(query, args, 0, timeout)
  File "/home/valentin/.pyenv/versions/tesla-much/lib/python3.6/site-packages/asyncpg/connection.py", line 651, in _execute
    return await self._do_execute(query, executor, timeout)
  File "/home/valentin/.pyenv/versions/tesla-much/lib/python3.6/site-packages/asyncpg/connection.py", line 662, in _do_execute
    stmt = await self._get_statement(query, None)
  File "/home/valentin/.pyenv/versions/tesla-much/lib/python3.6/site-packages/asyncpg/connection.py", line 276, in _get_statement
    len(query) > self._max_cacheable_statement_size):
TypeError: object of type 'Select' has no len()

I think that asyncpg was made some changes inside the code. Need workaround or fix or maybe I've made mistake some where.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.