encode / databases Goto Github PK
View Code? Open in Web Editor NEWAsync database support for Python. ๐
Home Page: https://www.encode.io/databases/
License: BSD 3-Clause "New" or "Revised" License
Async database support for Python. ๐
Home Page: https://www.encode.io/databases/
License: BSD 3-Clause "New" or "Revised" License
Hey! Thanks for this awesome library.
I'm replacing direct aiopg
usage with it, but I ran into one issue that I can't seem to solve. When executing a query aiopg
returns an object with a .rowcount
attribute. This is quite handy when using an INSERT ... ON CONFLICT
query to know how many rows have been inserted.
I cannot seem to find out to how expose this information with this library. I'm guessing it might be more of an issue with asyncpg
, but I was wondering if this is possible and if could be documented somewhere?
In particular:
databases
API.Currently I maintain port for databases. I think it would be better to add note in readme about python 3.5 support. What do you think about it?
Same about starlette.
my version: 0.2.2
asyncpg version: 0.18.3
functions: database.fetch_one, database.fetch_all
dict(user_row)
when user_row is a databases.backends.postgres.Record
object returns an empty dict for me when using postgres through asyncpg.
dict(user_row.__dict__['_row'])
works, so I know that at least in my case the record has successfully loaded and there is a dict somewhere. dict style user_row['id']
works just fine.
Is there any way to make use of RETURNING
statement on insert to get all defaults for columns that are set on the db server? In the documentation for SQLAlchemy Core there is method return_defaults()
(https://docs.sqlalchemy.org/en/13/core/dml.html#sqlalchemy.sql.expression.ValuesBase.return_defaults) which if applied should return all columns that were not provided and had default set.
According to issue #55 database.execute
returns only a pointer to the last record (usually id of inserted record), so executing this doesn't work:
stmt = table.insert().values(data='newdata').return_defaults()
result = await database.execute(stmt)
server_created_at = result.returned_defaults['created_at']
Is there any way around it or is it absolutely necessary to make two calls to the database to get newly inserted record with all defaults assigned?
We should lazily establish the connection pool as needed.
Hi,
are you planning on supporting SQLAlchemy ORM?
This would be very helpful for users with less knowledge about database design.
Hi all,
I've started messing around with Databases to see how it could work with Bocadillo. It looks quite promising, but there are a few issues I encountered while getting started.
Being not very familiar with SQLAlchemy, I had quite a hard time finding resources to learn the syntax mentioned in the README (SQLAlchemy Core). I used SQLAlchemy with Flask in the past but I think it had a different syntax (e.g. db.commit()
, etc) which doesn't seem to be the one used here.
I also found that the docs for SQLAlchemy Core the README links to are quite cryptic for new starters.
Do you think there could be more code examples? Especially about the following:
WHERE
clause: notes.select().where(id=1)
?For what it's worth, I'd be glad to help out with this โ does anyone have good introductory resources to SQLAlchemy Core aside from the official docs?
It seems the _build_query method in core does not work with SQLAlchemy Core Delete statements. The Delete object does not have a values attribute, and will throw an exception if you attempt to pass one in:
stmt = table.delete().where(model.id = sa.bindparam('id'))
values = [dict(id=1), dict(id=2), ...]
await session.execute_many(query=stmt, values=values)
Will raise an AttributeError of "'Delete' object has no attribute 'values'" at this line:
https://github.com/encode/databases/blob/master/databases/core.py#L228
A simple workaround is to compile the statement before sending it to the execute_many method:
await session.execute_many(query=str(stmt), values=values)
It'd be nice if the package could interpret the query passed in, and translate it.
There's a small proposal/question about the details for contenting the DB.
There might be a situation where the host might have a complex address, particularly GCP has something like:
/<cloudsql>/<project>:<region>:<db_instance>
Generally, it was solved being passed as a query param unix_socket=<host>/.s.PGSQL.5432
to the sqlalchemy engine.
It doesn't work with asyncpg
as it tries to parse the db url and the :
char violates the parsing logic (when such host is part of the url or passed as environment variable) or the host is empty str when it's passed as unix_socket
param.
But asyncpg
can take connections params separately https://magicstack.github.io/asyncpg/current/api/index.html#connection. Params have higher precedence over the url parsing, so they can take place.
I haven't come up with a great idea yet, but it might be done as
class PostgresBackend | DatabaseBackend:
def __init__(self, database_url: typing.Union[DatabaseURL, str], params: typing.Dict) -> None:
self._database_url = DatabaseURL(database_url)
...
self._params = params
async def connect(self) -> None:
assert self._pool is None, "DatabaseBackend is already running"
kwargs = self._get_connection_kwargs()
kwargs.update(self._params)
self._pool = await asyncpg.create_pool(str(self._database_url), **kwargs)
Would it be possible to consider?
Happy to create a PR with a reasonable solution.
There is a debug log entry for each query within all backends, recording the query itself and its arguments.
The code is similar for Postgres, SQLite and MySQL backends:
logger.debug(query, args)
That fails because the logging is trying to parse args
and replace into query
.
I'm using the following code in pure SQLAlchemy environment to set up a database to a PG server:
from sqlalchemy import create_engine
pg_url = f'postgresql+psycopg2://app:{pg_config["password"]}@{pg_config["host"]}/app'
pg_certs = {
'sslcert': f'{config_dir}/client-cert.pem',
'sslkey': f'{config_dir}/client-key.pem',
'sslrootcert': f'{config_dir}/server-ca.pem',
}
pg_engine = create_engine(
pg_url,
connect_args=pg_certs,
use_batch_mode=True,
pool_pre_ping=True,
echo=bool(os.environ.get('SHOWSQL')),
pool_reset_on_return='rollback',
)
How can I put this in a syntax which this package understands?
After many years in the sync land, I'm trying to write my first API in Starlette/databases, and I'm puzzled by the very first task, that is to run something like the official example here:
https://github.com/encode/databases#queries
This does root level await database.connect()
, which I don't know how is even possible?
That returns a
SyntaxError: 'await' outside function
Now I see that IPython has some kind of async support in 7.x but I don't know how to run that code under IPython either, it returns me the same error.
IPython docs: https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
I'm not sure if this is meant to be the intended behavior or not, since iterate() isn't very documented as of right now, but attempting to use it without creating a transaction first causes asyncpg to throw the following: asyncpg.exceptions.NoActiveSQLTransactionError: cursor cannot be created outside of a transaction
.
The issue can easily be reproduced with the following snippet. I haven't tested other backends to see if the same thing occurs with SQLite and MySQL.
from databases import Database
import asyncio
mydb = Database('postgresql://localhost:5433/testdb11')
async def test():
await mydb.connect()
async for row in mydb.iterate(query='SELECT * FROM (VALUES (1), (2), (3), (4), (5)) as t(num)'):
print(row)
await mydb.disconnect()
loop = asyncio.get_event_loop()
loop.run_until_complete(test())
loop.close()
SQLAlchemy doesn't require the use of transactions to iteratively load the results of a query, and the documentation doesn't specify this constraint, so transactions probably shouldn't be enforced here either as far as the public API for functions goes.
Whenever I have SQLAlchemy table column with a default value (e.g. Column("verified", Boolean(), default=False)
), the resulting insert statement generated by databases
sets a null
value for the column when that column isn't provided (e.g. db.execute(mytable.insert(), {...})
where "verified" isn't set).
Are there plans to support column defaults in insert statements?
Like the title says the error is thrown when I am trying to access the column which stores UUID value. I have postgres at the backend and my table definition looks like this:
table = sa.Table(
"user",
metadata,
sa.Column("user_id", pg.UUID(as_uuid=True), nullable=False, default=uuid4(), primary_key=True),
sa.Column("name", pg.TEXT, nullable=False),
sa.Column("created_ts", pg.TIMESTAMP(timezone=True), nullable=False, server_default=sa.func.now())
)
Then if I try to fetch a record and access user_id
attribute:
user = await database.fetch_one(
table.select().where(table.c.user_id == user_id)
)
id = user["user_id"]
This throws an exception with message: 'UUID' object has no attribute 'replace'
. Accessing attributes of other types, such as str
, bool
, datetime
works just fine. Seems like UUID
instance is being treated as string and mapper tries to parse it instead of returning directly (because of pg.UUID(as_uuid=True)
).
If I am not missing anything, is there a way to get PG Record values using dot notation instead of dict-like syntax?
For now we have to use something like user['hashed_password']
which breaks Readability counts.
of zen :)
Would be great to use just user.hashed_password
.
For a couple of years I've used some UUID -> str custom type recipe, which basically revolves around importing and extending UUIDType
from sqlalchemy_utils
.
The entirety of the recipe is basically:
from sqlalchemy_utils.types import UUIDType
class UUIDString(UUIDType):
def process_result_value(self, value, dialect):
if value is None:
return value
return str(value)
The only point is to make UUID columns to on models convert to the string representation of the UUID, during for example serialization to JSON because the JSON serializers usually don't like bytes :-)
When using one of my existing SQLAlchemy models and trying out this lovely project, I consistently ran into an issue with accessing UUID columns.
Example:
class SomethingModel(Base)
id = Column(UUIDString, default=lambda: str(uuid.uuid4()), primary_key=True)
field = Column(String, nullable=False)
In[1]: await db.connect()
In[2] tbl = SomethingModel.__table__
In[3] recs = await db.fetch_all(tbl.select())
In[4] r = recs[0]
In[5] r['id']
TypeError Traceback (most recent call last)
<ipython-input-3-31a8ef0c7445> in <module>
----> 1 r.get('id')
~/w/my-api/venv/lib/python3.7/_collections_abc.py in get(self, key, default)
658 'D.get(k[,d]) -> D[k] if k in D, else d. d defaults to None.'
659 try:
--> 660 return self[key]
661 except KeyError:
662 return default
~/w/my-api/venv/lib/python3.7/site-packages/databases/backends/postgres.py in __getitem__(self, key)
112
113 if processor is not None:
--> 114 return processor(raw)
115 return raw
116
~/w/my-api/venv/lib/python3.7/site-packages/sqlalchemy/sql/type_api.py in process(value)
1224 if impl_processor:
1225 def process(value):
-> 1226 return process_value(impl_processor(value), dialect)
1227
1228 else:
~/w/my-api/venv/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py in process(value)
900 def process(value):
901 if value is not None:
--> 902 value = bytes(value)
903 return value
904 return process
TypeError: cannot convert 'UUID' object to bytes
After an embarrassingly long debugging session, I got this working by implementing a result_processor
method on my custom type to make sure that process_result_value
on my custom type was called at all:
def result_processor(self, dialect, coltype):
return functools.partial(self.process_result_value, dialect=dialect)
After adding this method my custom type works, but I feel like this can't be the right way to fix my problem. I feel like this is probably not an issue with databases
, but since I encountered it only after attempting to use it, and as far as I can tell the "standard" process_result_value
method was never entered or looked for, I thought I'd ask for help here.
how can i change cursorclass to DictCursor?
Introduce a decorator-style usage for transactions...
@database.transaction()
def homepage(request):
...
should we fix the tag "asnycio" ? :)
may be the same with "mysq"
Hi Tom,
Following #87, I also discovered that https://github.com/encode/databases/blob/master/databases/backends/postgres.py#L109 is breaking my application because it doesn't respect the actual type and ends up applying the default implementation type of a column.
For instance, if you create a column using UUIDType (from sqlalchemy_utils), it will fail when accessing it because it'll fallback to the default implementation (BINARY or CHAR) to process it. This means that, while the value in the record is a UUID type, the processor tries to convert it as bytes (which doesn't work as expected).
When commenting out that line altogether, meaning no process is applied and the data is returned raw, my application now works fully.
I then noticed that the pgsql driver is the only not using RowProxy from sqlalchemy for records and this issue is the opportunity for me to ask the rationale of that choice :)
Thanks,
Mention the existing async ORMS, pros/cons.
Explain why databases
exists, and what benefits it provides.
Link out to the database drivers that we're relying on.
Currently migrations and table creation is always performed using sync drivers.
At some point we might want to consider how we can properly integrate the async drivers all the way through, in order to have low-dependency installs, and in order to make sure we're using the same drivers both for migration operations and for regular interactions.
There's no performance reasons for doing so, but it'd be nice if we had comprehensive support for the full set of possible database operations.
Hi, is it possible to provide a few samples using PostgreSQL Json/JsonB column types? Such as insert and querying data, and if possible, how to handle it inside migrations.
Thanks in advance
We should have a test that ensures that transactions are isolated by task-context.
I'm not exactly sure how to tackle that.
Related: We'll need to use the aiocontextvars Donefor the 3.6 backport, not
contextvars`.
We should allow cursored results, using an explicit iterate
interface:
async for row in database.iterate(query):
...
windows10 64 python3.6.6
databases 0.2.2
aiomysql 0.0.20
import asyncio
import databases
from sqlalchemy import insert, select, update, Column, Integer, SmallInteger
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
database = databases.Database("mysql://root:[email protected]:3306/demo?min_size=1&max_size=20")
class Task(Base):
__tablename__ = 'task'
id = Column(Integer, primary_key=True)
status = Column(SmallInteger, index=True, nullable=False)
async def add_tasks(num):
tasks = []
for i in range(num):
tasks.append({
'status': 0
})
await database.execute_many(insert(Task), values=tasks)
async def task_worker(task):
print("get task: %s" % task[0])
await database.execute(update(Task, Task.id == task[0]).values(status=1))
print("task: %s finished" % task[0])
async def process_tasks():
tasks = await database.fetch_all(select([Task], Task.status == 0))
print("get %s tasks" % len(tasks))
await asyncio.gather(*[task_worker(x) for x in tasks])
async def main():
await database.connect()
try:
await add_tasks(10)
await process_tasks()
finally:
await database.disconnect()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
exception
Traceback (most recent call last):
File "E:/workspace/PycharmProjects/demo/test.py", line 52, in <module>
loop.run_until_complete(main())
File "C:\Anaconda3\lib\asyncio\base_events.py", line 468, in run_until_complete
return future.result()
File "E:/workspace/PycharmProjects/demo/test.py", line 45, in main
await process_tasks()
File "E:/workspace/PycharmProjects/demo/test.py", line 38, in process_tasks
await asyncio.gather(*[task_worker(x) for x in tasks])
File "E:/workspace/PycharmProjects/demo/test.py", line 31, in task_worker
await database.execute(update(Task, Task.id == task[0]).values(status=1))
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\databases\core.py", line 123, in execute
return await connection.execute(query, values)
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\databases\core.py", line 206, in execute
return await self._connection.execute(self._build_query(query, values))
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\databases\backends\mysql.py", line 132, in execute
await cursor.execute(query, args)
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\cursors.py", line 239, in execute
await self._query(query)
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\cursors.py", line 457, in _query
await conn.query(q)
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 428, in query
await self._read_query_result(unbuffered=unbuffered)
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 622, in _read_query_result
await result.read()
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 1105, in read
first_packet = await self.connection._read_packet()
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 561, in _read_packet
packet_header = await self._read_bytes(4)
File "E:\workspace\PycharmProjects\demo\venv\lib\site-packages\aiomysql\connection.py", line 598, in _read_bytes
data = await self._reader.readexactly(num_bytes)
File "C:\Anaconda3\lib\asyncio\streams.py", line 674, in readexactly
yield from self._wait_for_data('readexactly')
File "C:\Anaconda3\lib\asyncio\streams.py", line 452, in _wait_for_data
'already waiting for incoming data' % func_name)
RuntimeError: readexactly() called while another coroutine is already waiting for incoming data
maybe related to #81
.fetchall()
and fetchone()
should return proper Record
interface types.
SQLAlchemy Core supports a returning clause: https://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#insert-update-returning
Because database.execute returns None, and database.fetch* do not support values, it does not seem there is any way to execute and update or insert returning columns.
I can try to create a pull request for this if you can specify what the API should be. Perhaps optional values on select or database.returning(query, values).
I'm trying to set these parameters on last version available on pip, with no success:
await asyncio.wait_for(connected, loop=loop, timeout=timeout)
File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py", line 412, in wait_for
return fut.result()
asyncpg.exceptions.UndefinedObjectError: unrecognized configuration parameter "max_size"
ERROR: Application startup failed. Exiting.
Hi,
It appears that databases only accepts a boolean to indicate if you want to connect over TLS to postgresql. But, the asyncpg driver supports to pass your own context, which is necessary when you have your own certs.
This make integrating in some environment a showstopper (our environment relies on TLS internally).
Was there a specific issue for not allowing passing a context?
Cheers,
Implement connection pooling for SQLite
Testcase:
def test_replace_database_url_components_sqlite():
u = DatabaseURL("sqlite:///mydatabase.db")
assert u.database == "mydatabase.db"
new = u.replace(database="test_" + u.database)
assert new.database == "test_mydatabase.db"
assert str(new) == "sqlite:///test_mydatabase.db"
Pytest traceback:
E AssertionError: assert 'sqlite:/test_mydatabase.db' == 'sqlite:///test_mydatabase.db'
E - sqlite:/test_mydatabase.db
E + sqlite:///test_mydatabase.db
E ? ++
If I find a solution I'll make a PR. But so far I couldn't make it work
Hi,
Thanks for the awesome package (happy to use it with starlette). For rapid dev purpose, I'm relying on the SQLite backend and I see an odd behavior.
When I create tables with an in-memory db, those tables can never be found in sqlite_master afterwards. But when I rely on a file database, then that works fine.
Is there any gotcha for in-memory sqllite I should be aware of?
Cheers,
The default asyncpg pool size is 10, it'd be nice to be able to specify the pool size (and other pool options) during initialization through the databases package during the .connect() call:
https://magicstack.github.io/asyncpg/current/api/index.html#asyncpg-api-pool
It seems there's no options to flow kwargs to the backend.connect() call:
https://github.com/encode/databases/blob/master/databases/backends/postgres.py#L38
Not sure how that would impact mysql and sqlite, but I'm sure they may have similar kwargs which may be useful
We'll want integration examples with a few frameworks.
Certainly at least Starlette and Sanic.
Perhaps also aiohttp and Responder.
Versions:
Python==3.7.2
databases==0.1.7
aiosqlite==0.9.0
sqlalchemy=1.2.18
I have a table notes
in sqlite, I run code below:
query = notes.select().where(notes.c.id == note_id)
the_note = await database.fetch_one(query)
When notes
table doesn't have the row that id equals to note_id, it raises
...
File "/Users/xxx/.local/share/virtualenvs/rose-Xd4LdsnO/lib/python3.7/site-packages/databases/core.py", line 97, in fetch_one
return await connection.fetch_one(query=query)
File "/Users/xxx/.local/share/virtualenvs/rose-Xd4LdsnO/lib/python3.7/site-packages/databases/core.py", line 163, in fetch_one
return await self._connection.fetch_one(query=query)
File "/Users/xxx/.local/share/virtualenvs/rose-Xd4LdsnO/lib/python3.7/site-packages/databases/backends/sqlite.py", line 102, in fetch_one
return RowProxy(metadata, row, metadata._processors, metadata._keymap)
It seems that RowProxy
doesn't take an empty row as parameter. And we can fix it just add two line in databases/backends/sqlite.py
async with self._connection.execute(query, args) as cursor:
row = await cursor.fetchone()
+ if row is None:
+ return
metadata = ResultMetaData(context, cursor.description)
return RowProxy(metadata, row, metadata._processors, metadata._keymap)
And I found that MySQL backend has similar code in databases/backends/mysql.py
:
try:
await cursor.execute(query, args)
row = await cursor.fetchone()
metadata = ResultMetaData(context, cursor.description)
return RowProxy(metadata, row, metadata._processors, metadata._keymap)
finally:
await cursor.close()
So I guess MySQL properly has some issue but I have not tested it.
Hi,
I fully apologise for creating this issue but I'm trying to figure if you have a place to meet (like a slack?). I have questions which don't always warrant an issue or a SO question.
Thanks and sorry again for the spam. Feel free to close :)
We may want to add an aiopg
driver, enabled with postgresql+aiopg://localhost/my_database
.
Having more than one Postgres driver would be great for resilience.
databases
is not search engine freiendly, neither for human being.
I have the following table
metadata = sqlalchemy.MetaData()
clients = sqlalchemy.Table(
"clients",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String, nullable=False, unique=True),
sqlalchemy.Column("dsn", sqlalchemy.String, nullable=False),
)
should I perform this:
query2 = clients.insert()
values =[
{"name": "default", "dsn":"postgresql://foo:bar@host1/dev"},
{"name": "local", "dsn":"postgresql://foo:bar@host2/dev"}
]
rq2 = await database.execute(query=query2, values=values[0])
then I got rq2 that is equal to the id of the row inserted
now if I try to do
response = await database.execute_many(query=query, values=values)
rows are indeed correctly inserted, but I don't have the ids of the rows inserted, which sucks because they are foreign keys in another table I plan on using for further operations
question is: is that intended / normal in which case I'm ok to make a loop on the values I want to insert, grab the id inserted and deal with it later or is there a way to enhance execute_many ?
For working from sync test cases, or for working from the console, it'd be useful to be able to acquire a synchronous interface onto a connection.
Eg.
database = Database(DATABASE_URL)
connection = database.sync_connection()
connection.execute(...)
Or...
@pytest.fixture
def connection():
return database.sync_connection()
def test_something(client, database):
client.post("/users", json=...)
assert database.fetch_all(...) = ...
When using asyncpg, I am getting InterfaceError: cannot perform operation: another operation is in progress
when running multiple db calls concurrently.
I am able to replicate and isolate the issue with force_rollback=True
, although in my project where I encountered this bug, it is happening regardless.
Here is the isolated issue:
import asyncio
import databases
from starlette.applications import Starlette
from starlette.config import Config
from starlette.responses import JSONResponse
config = Config('.env')
DATABASE_URL = config('DATABASE_URL', default='postgresql://postgres@localhost:5432/postgres')
database = databases.Database(DATABASE_URL, force_rollback=True)
app = Starlette()
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.route("/test", methods=["GET"])
async def test_view(request):
await asyncio.gather(
get_from_db(),
get_from_db(),
)
return JSONResponse({"success": True})
async def get_from_db():
return await database.fetch_all("SELECT pg_sleep(1)")
if __name__ == '__main__':
from starlette.testclient import TestClient
with TestClient(app) as test_client:
test_client.get('/test')
Result
$ python app.py
Traceback (most recent call last):
File "app.py", line 40, in <module>
test_client.get('/test')
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/requests/sessions.py", line 546, in get
return self.request('GET', url, **kwargs)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 382, in request
json=json,
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
resp = self.send(prep, **send_kwargs)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
r = adapter.send(request, **kwargs)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 211, in send
raise exc from None
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 208, in send
loop.run_until_complete(connection(receive, send))
File "/Users/ryan/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/middleware/errors.py", line 125, in asgi
raise exc from None
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/middleware/errors.py", line 103, in asgi
await asgi(receive, _send)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/exceptions.py", line 74, in app
raise exc from None
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/exceptions.py", line 63, in app
await instance(receive, sender)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/routing.py", line 41, in awaitable
response = await func(request)
File "app.py", line 28, in test_view
get_from_db(),
File "app.py", line 34, in get_from_db
return await database.fetch_all("SELECT pg_sleep(1)")
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 95, in fetch_all
return await connection.fetch_all(query, values)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 179, in fetch_all
return await self._connection.fetch_all(self._build_query(query, values))
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/backends/postgres.py", line 137, in fetch_all
rows = await self._connection.fetch(query, *args)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 421, in fetch
return await self._execute(query, args, 0, timeout)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 1412, in _execute
with self._stmt_exclusive_section:
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 1847, in __enter__
'cannot perform operation: another operation is in progress')
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "app.py", line 40, in <module>
test_client.get('/test')
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 415, in __exit__
loop.run_until_complete(self.wait_shutdown())
File "/Users/ryan/.pyenv/versions/3.7.2/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 435, in wait_shutdown
self.task.result()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/testclient.py", line 420, in lifespan
await inner(self.receive_queue.get, self.send_queue.put)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/routing.py", line 483, in asgi
await self.shutdown()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/starlette/routing.py", line 468, in shutdown
await handler()
File "app.py", line 21, in shutdown
await database.disconnect()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 74, in disconnect
await self._global_transaction.__aexit__()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 256, in __aexit__
await self.rollback()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/core.py", line 297, in rollback
await self._transaction.rollback()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/databases/backends/postgres.py", line 215, in rollback
await self._transaction.rollback()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/transaction.py", line 219, in rollback
await self.__rollback()
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/transaction.py", line 198, in __rollback
await self._connection.execute(query)
File "/Users/ryan/.local/share/virtualenvs/postgres-bug-tR_IhxOx/lib/python3.7/site-packages/asyncpg/connection.py", line 273, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 301, in query
File "asyncpg/protocol/protocol.pyx", line 659, in asyncpg.protocol.protocol.BaseProtocol._check_state
asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
When running into this issue with force_rollback=False
, since there isn't supposed to be a global connection and transaction, I was able to get around the issue by changing the following function:
async def get_from_db():
return await database.fetch_all("SELECT pg_sleep(1)")
to:
async def get_from_db():
async with Connection(database._backend) as conn:
return await conn.fetch_all("SELECT pg_sleep(1)")
I tried using async with database.connection() as conn:
but it would return the same connection that is being currently being used. I guess I was just trying to understand the logic of sharing the pool connection, instead of letting the pool allocate connections (when available), since asyncpg Connection only allows one statement at a time.
There is simplified example extracted from /databases/backends/postgres.py:connect()
import asyncio
import asyncpg
from databases import DatabaseURL
async def run():
url = 'postgresql://user:password@localhost/dbname'
pool = await asyncpg.create_pool(str(DatabaseURL(url)))
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
I am getting error socket.gaierror: [Errno -3] Temporary failure in name resolution
and that's fair since according to https://magicstack.github.io/asyncpg/current/api/index.html#connection-pools accepts dsn (str) โ Connection arguments specified using as a single string in the following format: postgres://user:pass@host:port/database?option=value
like that.
So we need to add some note to docs maybe or handle that situation via advanced parsing maybe..
I'm developing a web app with starlette and database, and use docker-compose to run it, but I met an unexpected problem.
My docker-compose file is below
version: "3.0"
services:
app:
build: ./
links:
- "db:mysqlserver"
volumes:
- ./db/envfile:/app/.env
ports:
- 8000:8000
db:
image: mysql:5.7
expose:
- 3306
environment:
MYSQL_ROOT_PASSWORD: test
volumes:
- ./db/db.sql:/docker-entrypoint-initdb.d/db.sql
restart: always
hostname: mysqlserver
The database wasn't avaliable when app start, so i add a execption handle for it.
async def connect_db():
try:
await database.connect()
except Exception as e:
logger.error("connect db error, res is {}".format(e))
await asyncio.sleep(3)
asyncio.ensure_future(connect_db())
app.add_event_handler("startup", connect_db)
But database.connectdidn't raise any exception, so i can't handle it.
I don't know if the error handling is correct, maybe should throw exception when connect?
Thank you
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.