Giter Site home page Giter Site logo

pyathena's Introduction

pyathena's People

Contributors

ajm10565 avatar alexells avatar armaseg avatar benm-qwak avatar bossenti avatar cansjt avatar capitancambio avatar dependabot[bot] avatar edgarrmondragon avatar fernbach avatar gallushi avatar gaqzi avatar helpsystems-mushkevych avatar jasonamyers avatar jayceslesar avatar kanga333 avatar kbrose avatar laughingman7743 avatar mai-nguyen-coinhako avatar maicon-berndt avatar matthiashuschle avatar mdeshmu avatar navi86 avatar sebastian-nagel avatar t-yuki avatar woosuk-choi-g avatar xuganyu96 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

pyathena's Issues

{key=value} to json

There is a map data structure in athena, the result of the query is {key=value}, how to convert it to json

make s3_staging_dir optional when work group is passed

Since query result location can be configured at the workgroup level, would it be possible to make the s3_staging_dir parameter optional.

excerpt from boto3 documentation
**

ResultConfiguration (dict) --
Specifies information about where and how to save the results of the query execution. If the query runs in a workgroup, then workgroup's settings may override query settings. This affects the query results location. The workgroup settings override is specified in EnforceWorkGroupConfiguration (true/false) in the WorkGroupConfiguration. See WorkGroupConfiguration$EnforceWorkGroupConfiguration .

OutputLocation (string) --
The location in Amazon S3 where your query results are stored, such as s3://path/to/query/bucket/ . To run the query, you must specify the query results location using one of the ways: either for individual queries using either this setting (client-side), or in the workgroup, using WorkGroupConfiguration . If none of them is set, Athena issues an error that no output location is provided. For more information, see Query Results . If workgroup settings override client-side settings, then the query uses the settings specified for the workgroup. See WorkGroupConfiguration$EnforceWorkGroupConfiguration .

**

Time difference b/w cursor.execute() function in PyAthena and Query execution time displayed in Athena Console

When running any given query using the PyAthena driver, a variable delay of 2-5 seconds is observed between the execute() function and the Query execution time in the AWS athena console.

For example,
Time taken by PyAthena execute function to return result: 6.2 seconds
Query execution time in Athena console: 3.42 seconds

Is this a known issue? Can someone please point me in the right direction to reduce the delay that is observed.

Closing connection

The readme has no examples closing the Athena connection with conn.close() while there're some snippets in tests, are there any recommendations on that? Is it better to use with statement or connection is closed automatically?

larger than memory queries?

I'm hoping to use Athena with Dask, performing queries which return 10-30 GB and then training some distributed ML algorithms. Any suggestions for concurrent/distributed io for such a task? I've been quite happy with the pandas cursor for smaller local use, following the examples in the pyAthena documentation, but I still have no idea what I am actually doing-- does the pandas cursor do concurrent io, or is it limited to one core?

I apologize in advance if this question belongs on some other forum-- let me know and I'll gladly move the conversation there. Thanks!

retry capability in pyathena

Hi Team

Can you please add retry capability in pyathena. Since athena is serverless, we might get failures based on the load on AWS. If this feature is already present, Can you please share the details on how to use.

Thank you

Unable to verify/create output bucket

I am not sure if this is a pyathena problem or a problem of the AWS rights or sth completely else.

The following error happens sometimes (sometimes it also works, its really non-deterministic):

pyathena.error.DatabaseError: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: Unable to verify/create output bucket <my-output-bucket>

The strange thing is that it might work for a few days, and then this error occurs again. I have no idea why this happens (the AWS user has write rights to the bucket, it works sometimes, so this is correct). I added rights to AWS Glue because it was suggested in some forum, but it didnt help.

Also any hints how to debug this problem are appreciated.

Regular expression SQLs with % in cannot be run

Hi,

I noticed that some basic valid sqls cant be run using pyathena
e.g.
SELECT key from schema.table where key like '%greenhopper%' LIMIT 10

which normally runs,

gives the following stack trace in pyathena (immediately fails when attempting to parse the %):
python pyathenatest.py Traceback (most recent call last): File "pyathenatest.py", line 9, in <module> cursor.execute(sql) File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/pyathena/util.py", line 29, in _wrapper return wrapped(*args, **kwargs) File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/pyathena/cursor.py", line 213, in execute query = self._formatter.format(operation, parameters) File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/pyathena/formatter.py", line 112, in format return (operation % kwargs).strip() TypeError: float argument required, not dict

I'll have a look myself at the code for that function.

Django ORM support

Any chances to add a Django ORM support in the future?
Not sure if it's doable due to the Django db connection layer constraints...
Thanks

Pyathena encryption with kms key is not working

I am querying Athena using PyAthena-1.4.6.

connection = connect(
    s3_staging_dir='s3://' + athena_results_s3_bucket + '/athena_results',
    region_name=region,
    encryption_option='SSE-KMS',
    kms_key=kms_key
)
cursor = connection.cursor()
cursor.execute(sql_query)

I am able to query the data, but the data in s3 staging_dir is not encrypted. I tried all possible values for 'encryption_opion'. I could not find any documentation for this.
`

Support missing values in Boolean columns

#80
Similar to the above issue, I ran into an issue with missing values in Boolean Columns. When parsing to pandas the conversion fails because we set the dtype to boolean while boolean does not support Na.

pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._read_rows()
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_column_data()
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_tokens()
pandas/_libs/parsers.pyx in pandas._libs.parsers.TextReader._convert_with_dtype()
ValueError: Bool column has NA values in column 3

Could we add a try except Value Error around

df = pd.read_csv(io.BytesIO(response['Body'].read()),

If it fails we could raise a warning and try again the DataFrame creation with relaxed constraints:

  • Either we follow pandas and convert to object the column involved in the issue (I'm not sure we can detect the faulty column, so we could convert all columns with dtypes that are not Na compatible.)
  • Convert booleans (I understand from the PR above that int is already na safe ) to int and warn the user.

If needed I can write a PR once we agree on a solution !

Thanks

Cursor doesn't support config property

Hey @laughingman7743!
First of all thank you for such a good tool to work with Athena.
Now o would like to come back to the problem.
When i am trying to pass 'config' property for boto3.client as element of kwargs
pyathena.connect(s3_staging_dir=env['athena_staging_dir'], region_name=env['region'], config=env['config']), it fails because Session constructor unpack kwargs parameter and there is no 'config' argument. So it couldn't reach boto3.client in connection.py.
Could we fix this problem by using _get_default_session as it is implemented in boto3? Or we could add some properties to constructor in connection.py , because now we can't use any of kwargs which are optional in boto3.client

Latest pyathena version (1.6.0) requires pandas

It seems that pyathena 1.6.0 requires pandas although 1.5.1 didn't. It's impossible to import pyathena if pandas isn't installed

pip install pyathena
python -c 'from pyathena.cursor import AsyncCursor'

ImportError: No module named pandas

Works in version 1.5.1

query succeds with correct answer, but a botocore.errorfactory.InvalidRequestException error message is logged

#I am using PyAthena to query the recently released CommonCrawl parquet archives as described in
http://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/

I set up and tested my indes as described in the above article
I then test the same query as in the article using PyAthena:

import pandas as pd
from urllib.parse import quote_plus  # PY2: from urllib import quote_plus
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData

conn_str = 'awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
           '{schema_name}?s3_staging_dir={s3_staging_dir}'

engine = create_engine(conn_str.format(
    aws_access_key_id=quote_plus(KEY),
    aws_secret_access_key=quote_plus(SECRET),
    region_name='us-east-1',
    schema_name='ccindex',
    s3_staging_dir=quote_plus('s3://athenatmp2/xyz/')))


pd.read_sql(
    """
SELECT COUNT(*) AS count,
       url_host_registered_domain
FROM "ccindex"."ccindex"
WHERE crawl = 'CC-MAIN-2018-05'
  AND subset = 'warc'
  AND url_host_tld = 'no'
GROUP BY  url_host_registered_domain
HAVING (COUNT(*) >= 100)
ORDER BY  count DESC
    """, engine)

When I run the above code the following error mesage gets printed out"

Failed to execute query.
Traceback (most recent call last):
  File "/home/dm/anaconda3/lib/python3.6/site-packages/pyathena/common.py", line 165, in _execute
    **request)
  File "/home/dm/anaconda3/lib/python3.6/site-packages/pyathena/util.py", line 45, in retry_api_call
    return retry(func, *args, **kwargs)
  File "/home/dm/anaconda3/lib/python3.6/site-packages/tenacity/__init__.py", line 313, in call
    start_time=start_time)
  File "/home/dm/anaconda3/lib/python3.6/site-packages/tenacity/__init__.py", line 269, in iter
    return fut.result()
  File "/home/dm/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 425, in result
    return self.__get_result()
  File "/home/dm/anaconda3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
  File "/home/dm/anaconda3/lib/python3.6/site-packages/tenacity/__init__.py", line 316, in call
    result = fn(*args, **kwargs)
  File "/home/dm/anaconda3/lib/python3.6/site-packages/botocore/client.py", line 324, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/home/dm/anaconda3/lib/python3.6/site-packages/botocore/client.py", line 622, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: Conversion = '''

The pprograms keeps running and returns the same answer as obtained in the Athena web console:

Out[1]: 
       count   url_host_registered_domain
0     278171                     blogg.no
1     177143               aftenposten.no
2     172755                  blogspot.no
3     166156            platekompaniet.no
4     156718                       nrk.no
5     115110                      urlm.no
6     104641                    fruugo.no
7      90093                       uio.no
8      65259                    zoover.no
9      56452                       uib.no
10     56250              anbudstorget.no
11     52171                  multicom.no
12     50460                     tanum.no
13     49668                       snl.no
14     41147               kursagenten.no
15     40221               slideplayer.no
16     35110                     kayak.no
17     31495                   skruvat.no
18     29917                       mtv.no
19     29659                nettavisen.no
20     27738               regjeringen.no
21     27455                 haugenbok.no
22     27350                   footway.no
...

PyAthena or SQLAlchemy must be reporting and swallowing the error from deeper down.
While the answer is correct, the error meassage is concerning.

The test for SQLAlchemy is broken

============================= test session starts ==============================
platform linux -- Python 3.6.3, pytest-3.8.2, py-1.6.0, pluggy-0.7.1
rootdir: /home/travis/build/laughingman7743/PyAthena, inifile: setup.cfg
plugins: flake8-1.0.2, cov-2.6.0
collected 121 items
setup.py .                                                               [  0%]
pyathena/__init__.py .                                                   [  1%]
pyathena/async_cursor.py .                                               [  2%]
pyathena/common.py .                                                     [  3%]
pyathena/connection.py .                                                 [  4%]
pyathena/converter.py .                                                  [  4%]
pyathena/cursor.py .                                                     [  5%]
pyathena/error.py .                                                      [  6%]
pyathena/formatter.py .                                                  [  7%]
pyathena/model.py .                                                      [  8%]
pyathena/pandas_cursor.py .                                              [  9%]
pyathena/result_set.py .                                                 [  9%]
pyathena/sqlalchemy_athena.py .                                          [ 10%]
pyathena/util.py .                                                       [ 11%]
tests/__init__.py .                                                      [ 12%]
tests/conftest.py .                                                      [ 13%]
tests/test_async_cursor.py ...............                               [ 25%]
tests/test_cursor.py ................................                    [ 52%]
tests/test_formatter.py ......................                           [ 70%]
tests/test_pandas_cursor.py .................                            [ 84%]
tests/test_result_set.py ...                                             [ 86%]
tests/test_sqlalchemy_athena.py .....F.........                          [ 99%]
tests/util.py .                                                          [100%]
=================================== FAILURES ===================================
_____________________ TestSQLAlchemyAthena.test_has_table ______________________
self = <tests.test_sqlalchemy_athena.TestSQLAlchemyAthena testMethod=test_has_table>
engine = Engine(awsathena+rest://athena.[secure].amazonaws.com:443/test_pyathena_6msjtv4t1y?s3_staging_dir=[secure])
connection = <sqlalchemy.engine.base.Connection object at 0x7f2dfb800390>
    @with_engine
    def test_has_table(self, engine, connection):
        self.assertTrue(Table('one_row', MetaData(bind=engine)).exists())
>       self.assertFalse(Table('this_table_does_not_exist', MetaData(bind=engine)).exists())
E       AssertionError: True is not false
tests/test_sqlalchemy_athena.py:115: AssertionError

PandasCursor doesn't automatically convert int columns with NA's to floats

I'm querying a large athena table and can successfully run a query using the below code, however it's really slow (for reasons covered in #46).

conn = pyathena.connect(**at.athena_creds)
df = pd.read_sql(sql, conn)

I would really like to take advantage of the performance boost that PandasCursor offers, however, when I run the code below, I get a value error.

conn = pyathena.connect(**at.athena_creds, cursor_class=PandasCursor)
cursor = at_con.cursor()
df = cursor.execute(sql).as_pandas()

>>> ValueError: Integer column has NA values in column 18

Now I understand why I'm getting this value error. I have a int column in my athena table which has NA values in it, which Pandas notoriously doesn't handle well (NaN's are floats in Pandas eyes, not ints). The pd.read_sql() seems to handle this gracefully. It recognizes there is an int column with NaN's and converts it to a float column. It would be great if pyathena did the same thing.

Ctrl-C while running query kills python session

Signal handling should be improved if possible, because both:

  1. Being unable to abort at all, and
  2. Abort at the cost of quitting a running REPL

are barely acceptable for interactive usage.

Fetching large result sets can be sped up massively using S3

Hello, thanks for such a great library; it has really made working with Athena from Python easy.

I download a lot of large results from Athena queries (in the gigabyte range). Unfortunately, using PyAthena for this is very (very) slow -- hundreds of times slower than just downloading the results from S3. This is because it is fetching and converting a few rows at a time via the Athena API.

I have taken to working around it with the following approach. Basically, execute the query, get the output location, then fetch with S3 (and convert to Pandas in my case).

from smart_open import smart_open  # pandas can't easily read from S3 using a role/profile

athena_cursor = pyathena.connect(...).cursor()

def query(sql, profile=None):
    """:Return: a Pandas DataFrame of results from a `sql` query executed against AWS Athena."""
    athena_cursor.execute(sql)
    # MUCH faster than PyAthena reading a few rows at a time via the API
    return pd.read_csv(smart_open(athena_cursor.output_location, profile_name=profile))

(I'm using smart_open to make fetching from S3 easy, but of course the same thing can be accomplished with just boto.)

Could something like this be incorporated directly into PyAthena? Perhaps connect could have an option fetch_with_s3=True or similar. You'd probably still need to do type conversion, but there could be a fast path for as_pandas() to let Pandas do it all in one go.

Just a thought; thanks again!

Proxies

I work behind a corporate Proxy. I randomly get 407 proxy authentication errors. I'm using SQLAlchemy with the pyAthena awsathena+rest dialect.

I'm setting up my proxy using:

    environ["http_proxy"] = HTTP_PROXY
    environ["https_proxy"] = HTTP_PROXY

I create my engine using this url:

athena_url = f'awsathena+rest://{AWS_ACCESS_KEY_ID}:{AWS_SECRET_ACCESS_KEY}@athena.{REGION_NAME}.amazonaws.com:443/{SCHEMA_NAME}?s3_staging_dir={S3_STAGING_DIR}'

I noticed that boto3 has a proxy option in the config and it did not appear that the pyAthena code allowed for that config option. Does the proxy options need to be passed through or should the environ proxy setting be enough?

Appreciate any assistance.

Cursor doesn't support setting OutputLocation

Default OutputLocation is the same path as table's location. It shouldn't be possible to set the location to somewhere else.

An example is when a user has read-only access to the table (and it's related S3). User cannot write to that S3 address, so it need to set OutputLocation to somewhere else.

Asynchronous API?

It might be nice to also expose an "asynchronous" api as well.

.execute_async(...) could act exactly like .execute() except return before the calls to ._poll() and ._pre_fetch()

You'd need an additional call to wait for the results (.await(..)) that would "finish" the execution.

This would be very useful for systems that process these queries in background jobs as you'd receive access to the query_id for various athena APIs (cancellation, etc.) very quickly and you could store that durable somewhere.

Thoughts?

Asynchronous Pandas?

It seems that one can either use the Async version of PyAthena, or use the Pandas capabilities, either in the form of pd.read_sql() or in the form of result_set.as_pandas().

It seems that using any form of Pandas integration makes all queries work in a synchronous manner, even if several calls are wrapped into a ThreadPoolExecutor.

Example:

import pyathena
sql_engine = pyathena.connect(s3_staging_dir=app.config['S3_STAGING_DIR'])

def get_result(sql):
    print('sending sql_query at {}'.format(time.ctime(int(time.time()))))
    result = pd.read_sql(sql_query, sql_engine)
    return result

with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
    future_to_df = {executor.submit(get_result, item['sql']): item['measure_name'] for item in sql_queries}
    for future in concurrent.futures.as_completed(future_to_df):
        measure_name = future_to_df[future]
        raw_df_results[measure_name] = future.result()

Will output:

sending sql_query at Sun Apr 14 23:09:30 2019
sending sql_query at Sun Apr 14 23:09:30 2019
sending sql_query at Sun Apr 14 23:09:30 2019

But Athena's own history log will show that the queries were submitted synchronously:

# Athena history log
Submitted: 2019/04/15 00:09:30 UTC+-1 - Run time: 5s
Submitted: 2019/04/15 00:09:35 UTC+-1 - Run time: 6s
Submitted: 2019/04/15 00:09:41 UTC+-1 - Run time: 4s

Whereas if I use pyathena.async_cursor.AsyncCursor instead, Athena's logs will show that all queries were submitted at the same time, and therefore will treat them accordingly. However, there is, then, no way to use the Pandas integration from PyAthena.

What would be the best way to combine Pandas integration + Asynchronous submission?

Python 3.6.4 Pip install doesnt work (futures dependancy)

attempting
pip3 install pyathena
results in an error
futures requires Python '>=2.6, <3' but the running Python is 3.6.4
This is because one of the requirements is futures, but probably is not required for python 3.

A work around is to install an older version of futures first, then installing pyathena
pip3 install futures==3.1.1

AssertionError: table not found

https://travis-ci.org/laughingman7743/PyAthena/jobs/547213920

============================= test session starts ==============================
platform linux -- Python 3.7.1, pytest-4.6.3, py-1.8.0, pluggy-0.12.0
cachedir: .tox/py37/.pytest_cache
rootdir: /home/travis/build/laughingman7743/PyAthena, inifile: setup.cfg
plugins: cov-2.7.1, flake8-1.0.4
collected 146 items
setup.py .                                                               [  0%]
pyathena/__init__.py .                                                   [  1%]
pyathena/async_cursor.py .                                               [  2%]
pyathena/async_pandas_cursor.py .                                        [  2%]
pyathena/common.py .                                                     [  3%]
pyathena/connection.py .                                                 [  4%]
pyathena/converter.py .                                                  [  4%]
pyathena/cursor.py .                                                     [  5%]
pyathena/error.py .                                                      [  6%]
pyathena/formatter.py .                                                  [  6%]
pyathena/model.py .                                                      [  7%]
pyathena/pandas_cursor.py .                                              [  8%]
pyathena/result_set.py .                                                 [  8%]
pyathena/sqlalchemy_athena.py .                                          [  9%]
pyathena/util.py .                                                       [ 10%]
tests/__init__.py .                                                      [ 10%]
tests/conftest.py .                                                      [ 11%]
tests/test_async_cursor.py ...............                               [ 21%]
tests/test_async_pandas_cursor.py ..................                     [ 34%]
tests/test_cursor.py ................................                    [ 56%]
tests/test_formatter.py ......................                           [ 71%]
tests/test_pandas_cursor.py ...................                          [ 84%]
tests/test_result_set.py ...                                             [ 86%]
tests/test_sqlalchemy_athena.py ...........F.......                      [ 99%]
tests/util.py .                                                          [100%]
=================================== FAILURES ===================================
__________________ TestSQLAlchemyAthena.test_reflect_schemas ___________________
self = <tests.test_sqlalchemy_athena.TestSQLAlchemyAthena testMethod=test_reflect_schemas>
engine = Engine(awsathena+rest://athena.[secure].amazonaws.com:443/test_pyathena_qy34ol1req?s3_staging_dir=[secure])
connection = <sqlalchemy.engine.base.Connection object at 0x7fe6c557b208>
    @with_engine
    def test_reflect_schemas(self, engine, connection):
        insp = sqlalchemy.inspect(engine)
        schemas = insp.get_schema_names()
>       self.assertIn(SCHEMA, schemas)
E       AssertionError: 'test_pyathena_qy34ol1req' not found in []
tests/test_sqlalchemy_athena.py:106: AssertionError
=============================== warnings summary ===============================
tests/test_async_cursor.py::TestAsyncCursor::test_arraysize
  /home/travis/build/laughingman7743/PyAthena/.tox/py37/lib/python3.7/site-packages/botocore/vendored/requests/packages/urllib3/_collections.py:1: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
    from collections import Mapping, MutableMapping
tests/test_sqlalchemy_athena.py::TestSQLAlchemyAthena::test_reflect_select
  /home/travis/build/laughingman7743/PyAthena/.tox/py37/lib/python3.7/site-packages/sqlalchemy/sql/sqltypes.py:665: SAWarning: Dialect awsathena+rest does *not* support Decimal objects natively, and SQLAlchemy must convert from floating point - rounding errors and other issues may occur. Please consider storing Decimal numbers as strings or integers on this platform for lossless storage.
    "storage." % (dialect.name, dialect.driver)
-- Docs: https://docs.pytest.org/en/latest/warnings.html
----------- coverage: platform linux, python 3.7.1-final-0 -----------
Name                              Stmts   Miss  Cover
-----------------------------------------------------
pyathena/__init__.py                 33      5    85%
pyathena/async_cursor.py             46      3    93%
pyathena/async_pandas_cursor.py      18      3    83%
pyathena/common.py                  107     12    89%
pyathena/connection.py               69      9    87%
pyathena/converter.py                61     12    80%
pyathena/cursor.py                   45      0   100%
pyathena/error.py                    23      0   100%
pyathena/formatter.py                66      2    97%
pyathena/model.py                    51      4    92%
pyathena/pandas_cursor.py            50      0   100%
pyathena/result_set.py              276     17    94%
pyathena/sqlalchemy_athena.py       102      7    93%
pyathena/util.py                     26      3    88%
-----------------------------------------------------
TOTAL                               973     77    92%
Coverage HTML written to dir htmlcov
============== 1 failed, 145 passed, 2 warnings in 222.30 seconds ==============
ERROR: InvocationError for command /home/travis/build/laughingman7743/PyAthena/.tox/py37/bin/pytest --cov pyathena --cov-report html --cov-report term --flake8 (exited with code 1)
___________________________________ summary ____________________________________
ERROR:   py37: commands failed
The command "tox" exited with 1.

a describe {table_name} occurred an error

File "pandas/_libs/parsers.pyx", line 881, in pandas._libs.parsers.TextReader.read
File "pandas/_libs/parsers.pyx", line 896, in pandas._libs.parsers.TextReader._read_low_memory
File "pandas/_libs/parsers.pyx", line 950, in pandas._libs.parsers.TextReader._read_rows
File "pandas/_libs/parsers.pyx", line 937, in pandas._libs.parsers.TextReader._tokenize_rows
File "pandas/_libs/parsers.pyx", line 2132, in pandas._libs.parsers.raise_parser_error
pandas.errors.ParserError: Error tokenizing data. C error: Expected 1 fields in line 7, saw 2

===========================
the cursor class is pandasCursor
if cursor class set as Cursor the error won't be raised
my schema is just as below:
image

(pyathena.error.OperationalError) com.facebook.presto.hive.DataCatalogException: Namespace test_pyathena_xy12gnw2tu not found

__________________ TestSQLAlchemyAthena.test_get_table_names ___________________

self = <sqlalchemy.engine.base.Connection object at 0x7ff729120898>
dialect = <pyathena.sqlalchemy_athena.AthenaDialect object at 0x7ff72d35ec50>
constructor = <bound method type._init_statement of <class 'sqlalchemy.engine.default.DefaultExecutionContext'>>
statement = '\n                SELECT\n                  table_schema,\n                  table_name,\n                  column_na...       ordinal_position,\n                  comment\n                FROM information_schema.columns\n                '
parameters = {}
args = ('\n                SELECT\n                  table_schema,\n                  table_name,\n                  column_n...  ordinal_position,\n                  comment\n                FROM information_schema.columns\n                ', [])
conn = <sqlalchemy.pool._ConnectionFairy object at 0x7ff72bbca7f0>
context = <sqlalchemy.engine.default.DefaultExecutionContext object at 0x7ff72915ab38>

    def _execute_context(self, dialect, constructor,
                         statement, parameters,
                         *args):
        """Create an :class:`.ExecutionContext` and execute, returning
            a :class:`.ResultProxy`."""
    
        try:
            try:
                conn = self.__connection
            except AttributeError:
                conn = self._revalidate_connection()
    
            context = constructor(dialect, self, conn, *args)
        except BaseException as e:
            self._handle_dbapi_exception(
                e,
                util.text_type(statement), parameters,
                None, None)
    
        if context.compiled:
            context.pre_exec()
    
        cursor, statement, parameters = context.cursor, \
            context.statement, \
            context.parameters
    
        if not context.executemany:
            parameters = parameters[0]
    
        if self._has_events or self.engine._has_events:
            for fn in self.dispatch.before_cursor_execute:
                statement, parameters = \
                    fn(self, cursor, statement, parameters,
                       context, context.executemany)
    
        if self._echo:
            self.engine.logger.info(statement)
            self.engine.logger.info(
                "%r",
                sql_util._repr_params(parameters, batches=10)
            )
    
        evt_handled = False
        try:
            if context.executemany:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_executemany:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_executemany(
                        cursor,
                        statement,
                        parameters,
                        context)
            elif not parameters and context.no_parameters:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute_no_params:
                        if fn(cursor, statement, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_execute_no_params(
                        cursor,
                        statement,
                        context)
            else:
                if self.dialect._has_events:
                    for fn in self.dialect.dispatch.do_execute:
                        if fn(cursor, statement, parameters, context):
                            evt_handled = True
                            break
                if not evt_handled:
                    self.dialect.do_execute(
                        cursor,
                        statement,
                        parameters,
>                       context)

.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:1182: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyathena.sqlalchemy_athena.AthenaDialect object at 0x7ff72d35ec50>
cursor = <pyathena.cursor.Cursor object at 0x7ff72915af98>
statement = '\n                SELECT\n                  table_schema,\n                  table_name,\n                  column_na...       ordinal_position,\n                  comment\n                FROM information_schema.columns\n                '
parameters = {}
context = <sqlalchemy.engine.default.DefaultExecutionContext object at 0x7ff72915ab38>

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)

.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/default.py:470: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

args = (<pyathena.cursor.Cursor object at 0x7ff72915af98>, '\n                SELECT\n                  table_schema,\n      ...  ordinal_position,\n                  comment\n                FROM information_schema.columns\n                ', {})
kwargs = {}

    @functools.wraps(wrapped)
    def _wrapper(*args, **kwargs):
        with _lock:
>           return wrapped(*args, **kwargs)

pyathena/util.py:29: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyathena.cursor.Cursor object at 0x7ff72915af98>
operation = '\n                SELECT\n                  table_schema,\n                  table_name,\n                  column_na...       ordinal_position,\n                  comment\n                FROM information_schema.columns\n                '
parameters = {}

    @synchronized
    def execute(self, operation, parameters=None):
        query = self._formatter.format(operation, parameters)
        _logger.debug(query)
    
        request = self._build_query_execution_request(query)
        try:
            self._reset_state()
            response = retry_api_call(self._connection.start_query_execution,
                                      exceptions=self.retry_exceptions,
                                      attempt=self.retry_attempt,
                                      multiplier=self.retry_multiplier,
                                      max_delay=self.retry_max_deply,
                                      exp_base=self.retry_exponential_base,
                                      logger=_logger,
                                      **request)
        except Exception as e:
            _logger.exception('Failed to execute query.')
            raise_from(DatabaseError(*e.args), e)
        else:
            self._query_id = response.get('QueryExecutionId', None)
>           self._poll()

pyathena/cursor.py:233: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyathena.cursor.Cursor object at 0x7ff72915af98>

    def _poll(self):
        if not self._query_id:
            raise ProgrammingError('QueryExecutionId is none or empty.')
        while True:
            try:
                request = {'QueryExecutionId': self._query_id}
                response = retry_api_call(self._connection.get_query_execution,
                                          exceptions=self.retry_exceptions,
                                          attempt=self.retry_attempt,
                                          multiplier=self.retry_multiplier,
                                          max_delay=self.retry_max_deply,
                                          exp_base=self.retry_exponential_base,
                                          logger=_logger,
                                          **request)
            except Exception as e:
                _logger.exception('Failed to poll query result.')
                raise_from(OperationalError(*e.args), e)
            else:
                query_execution = response.get('QueryExecution', None)
                if not query_execution:
                    raise DataError('KeyError `QueryExecution`')
                status = query_execution.get('Status', None)
                if not status:
                    raise DataError('KeyError `Status`')
    
                state = status.get('State', None)
                if state == 'SUCCEEDED':
                    self._completion_date_time = status.get('CompletionDateTime', None)
                    self._submission_date_time = status.get('SubmissionDateTime', None)
    
                    statistics = query_execution.get('Statistics', {})
                    self._data_scanned_in_bytes = statistics.get(
                        'DataScannedInBytes', None)
                    self._execution_time_in_millis = statistics.get(
                        'EngineExecutionTimeInMillis', None)
    
                    result_conf = query_execution.get('ResultConfiguration', {})
                    self._output_location = result_conf.get('OutputLocation', None)
                    break
                elif state == 'FAILED':
>                   raise OperationalError(status.get('StateChangeReason', None))
E                   pyathena.error.OperationalError: com.facebook.presto.hive.DataCatalogException: Namespace test_pyathena_xy12gnw2tu not found. Please check your query.

pyathena/cursor.py:192: OperationalError

The above exception was the direct cause of the following exception:

self = <tests.test_sqlalchemy_athena.TestSQLAlchemyAthena testMethod=test_get_table_names>
engine = Engine(awsathena+rest://athena.[secure].amazonaws.com:443/test_pyathena_6m87iesn50?s3_staging_dir=[secure])
connection = <sqlalchemy.engine.base.Connection object at 0x7ff72d35eeb8>

    @with_engine
    def test_get_table_names(self, engine, connection):
        meta = MetaData()
>       meta.reflect(bind=engine)

tests/test_sqlalchemy_athena.py:101: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.tox/py34/lib/python3.4/site-packages/sqlalchemy/sql/schema.py:3909: in reflect
    Table(name, self, **reflect_opts)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/sql/schema.py:439: in __new__
    metadata._remove_table(name, schema)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/util/langhelpers.py:66: in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/util/compat.py:187: in reraise
    raise value
.tox/py34/lib/python3.4/site-packages/sqlalchemy/sql/schema.py:434: in __new__
    table._init(name, metadata, *args, **kw)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/sql/schema.py:514: in _init
    include_columns, _extend_on=_extend_on)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/sql/schema.py:527: in _autoload
    _extend_on=_extend_on
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:1534: in run_callable
    return callable_(self, *args, **kwargs)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/default.py:372: in reflecttable
    table, include_columns, exclude_columns, **opts)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/reflection.py:598: in reflecttable
    table_name, schema, **table.dialect_kwargs):
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/reflection.py:369: in get_columns
    **kw)
<string>:2: in get_columns
    ???
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/reflection.py:54: in cache
    ret = fn(self, con, *args, **kw)
pyathena/sqlalchemy_athena.py:145: in get_columns
    } for row in connection.execute(query).fetchall()
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:939: in execute
    return self._execute_text(object, multiparams, params)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:1097: in _execute_text
    statement, parameters
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:1189: in _execute_context
    context)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:1402: in _handle_dbapi_exception
    exc_info
.tox/py34/lib/python3.4/site-packages/sqlalchemy/util/compat.py:203: in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/util/compat.py:186: in reraise
    raise value.with_traceback(tb)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/base.py:1182: in _execute_context
    context)
.tox/py34/lib/python3.4/site-packages/sqlalchemy/engine/default.py:470: in do_execute
    cursor.execute(statement, parameters)
pyathena/util.py:29: in _wrapper
    return wrapped(*args, **kwargs)
pyathena/cursor.py:233: in execute
    self._poll()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyathena.cursor.Cursor object at 0x7ff72915af98>

    def _poll(self):
        if not self._query_id:
            raise ProgrammingError('QueryExecutionId is none or empty.')
        while True:
            try:
                request = {'QueryExecutionId': self._query_id}
                response = retry_api_call(self._connection.get_query_execution,
                                          exceptions=self.retry_exceptions,
                                          attempt=self.retry_attempt,
                                          multiplier=self.retry_multiplier,
                                          max_delay=self.retry_max_deply,
                                          exp_base=self.retry_exponential_base,
                                          logger=_logger,
                                          **request)
            except Exception as e:
                _logger.exception('Failed to poll query result.')
                raise_from(OperationalError(*e.args), e)
            else:
                query_execution = response.get('QueryExecution', None)
                if not query_execution:
                    raise DataError('KeyError `QueryExecution`')
                status = query_execution.get('Status', None)
                if not status:
                    raise DataError('KeyError `Status`')
    
                state = status.get('State', None)
                if state == 'SUCCEEDED':
                    self._completion_date_time = status.get('CompletionDateTime', None)
                    self._submission_date_time = status.get('SubmissionDateTime', None)
    
                    statistics = query_execution.get('Statistics', {})
                    self._data_scanned_in_bytes = statistics.get(
                        'DataScannedInBytes', None)
                    self._execution_time_in_millis = statistics.get(
                        'EngineExecutionTimeInMillis', None)
    
                    result_conf = query_execution.get('ResultConfiguration', {})
                    self._output_location = result_conf.get('OutputLocation', None)
                    break
                elif state == 'FAILED':
>                   raise OperationalError(status.get('StateChangeReason', None))
E                   sqlalchemy.exc.OperationalError: (pyathena.error.OperationalError) com.facebook.presto.hive.DataCatalogException: Namespace test_pyathena_xy12gnw2tu not found. Please check your query. [SQL: '\n                SELECT\n                  table_schema,\n                  table_name,\n                  column_name,\n                  data_type,\n                  is_nullable,\n                  column_default,\n                  ordinal_position,\n                  comment\n                FROM information_schema.columns\n                ']

SQLAlchemy doubles percent sign (%)

(But it can be fixed by setting self._double_percents = False in AthenaIdentifierPreparer __init__)

Broken:

>>> import sqlalchemy
>>> engine = sqlalchemy.create_engine(.....)
>>> session = engine.connect()
>>> sql = "select date_parse('20191030', '%Y%m%d' )"
>>> txt = sqlalchemy.sql.text(sql)
>>> result = session.execute(txt, {})


Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/site-packages/pyathena/util.py", line 28, in _wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/pyathena/cursor.py", line 46, in execute
    raise OperationalError(query_execution.state_change_reason)
pyathena.error.OperationalError: INVALID_FUNCTION_ARGUMENT: Invalid format: "20191030"

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 948, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/sql/elements.py", line 269, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1060, in _execute_clauseelement
    compiled_sql, distilled_params
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1200, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1413, in _handle_dbapi_exception
    exc_info
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 265, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/util/compat.py", line 248, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1193, in _execute_context
    context)
  File "/usr/local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 509, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.6/site-packages/pyathena/util.py", line 28, in _wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python3.6/site-packages/pyathena/cursor.py", line 46, in execute
    raise OperationalError(query_execution.state_change_reason)
sqlalchemy.exc.OperationalError: (pyathena.error.OperationalError) INVALID_FUNCTION_ARGUMENT: Invalid format: "20191030" [SQL: "select date_parse('20191030', '%%Y%%m%%d' )"] (Background on this error at: http://sqlalche.me/e/e3q8)

(See how it has doubled the percent signs in the last line of the traceback?)

Fix by changing sqlalchemy_athena.py to have this:

class AthenaIdentifierPreparer(IdentifierPreparer):
    """PrestoIdentifierPreparer

    https://github.com/dropbox/PyHive/blob/master/pyhive/sqlalchemy_presto.py"""
    reserved_words = UniversalSet()

    def __init__(self, dialect, initial_quote='"',
                 final_quote=None, escape_quote='"',
                 quote_case_sensitive_collations=True, omit_schema=False):
        super(AthenaIdentifierPreparer, self).__init__(dialect, initial_quote, final_quote, escape_quote, quote_case_sensitive_collations, omit_schema)
        self._double_percents = False

Fixed:

>>> import sqlalchemy
>>> engine = sqlalchemy.create_engine(....., echo="debug")
>>> session = engine.connect()
>>> sql = "select date_parse('20191030', '%Y%m%d' )"
>>> txt = sqlalchemy.sql.text(sql)
>>> result = session.execute(txt, {})
2018-11-13 23:22:15,519 INFO sqlalchemy.engine.base.Engine select date_parse('20191030', '%Y%m%d' )
2018-11-13 23:22:15,519 INFO sqlalchemy.engine.base.Engine {}
2018-11-13 23:22:16,813 DEBUG sqlalchemy.engine.base.Engine Col ('_col0',)
>>> result.fetchall()
2018-11-13 23:22:27,171 DEBUG sqlalchemy.engine.base.Engine Row (datetime.datetime(2019, 10, 30, 0, 0),)
[(datetime.datetime(2019, 10, 30, 0, 0),)]
>>> session.execute(sqlalchemy.text("select :word"), {'word':"cat"}).fetchall()
2018-11-13 23:23:27,964 INFO sqlalchemy.engine.base.Engine select %(word)s
2018-11-13 23:23:27,965 INFO sqlalchemy.engine.base.Engine {'word': 'cat'}
2018-11-13 23:23:29,199 DEBUG sqlalchemy.engine.base.Engine Col ('_col0',)
2018-11-13 23:23:29,199 DEBUG sqlalchemy.engine.base.Engine Row ('cat',)
[('cat',)]

pandas.to_sql

Hi, I'd like to check if there is a plan to add pandas.to_sql feature with PyAthena connection?

Using WorkGroup in API requests

Hi,

Athena WorkGroup allows you to isolate queries for teams, applications etc. You can read more about Athena WorkGroup here https://docs.aws.amazon.com/athena/latest/ug/user-created-workgroups.html. I don't see a way to submit a query in particular WorkGroup rather than the primary which is default.

from pyathena.connection import Connection
conn = Connection(s3_staging_dir='s3://test/',region_name='us-east-1')
cur = conn.cursor(WorkGroup='test')
cur.execute('select * from test.table2') # this submits query in primary workgroup rather than test

Cancel aborted queries

After pressing Ctrl-C on cur.execute or future.result a cur.cancel should be sent to the server since athena is paid per query and there is no point in paying for the rest of an aborted query.

querying CommonCrawl parquet archive

I have set up Athena to point at the CommonCrawl parquet archive as described in
http://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/
I am able to run queries in the created table using the Athena web UI.

When I try to access the table from python using

from blaze import data
from urllib.parse import quote_plus 
from sqlalchemy.engine import create_engine
from sqlalchemy.sql.expression import select
from sqlalchemy.sql.functions import func
from sqlalchemy.sql.schema import Table, MetaData

conn_str = 'awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
           '{schema_name}?s3_staging_dir={s3_staging_dir}'

engine = create_engine(conn_str.format(
    aws_access_key_id=quote_plus(PUBLIC),
    aws_secret_access_key=quote_plus(PRIVATE),
    region_name='us-east-1',
    schema_name='ccindex',
    s3_staging_dir=quote_plus('s3://commoncrawl/cc-index/table/cc-main/warc/')))

db = data(engine)

db.ccindex.ccindex.peek()

I get


---------------------------------------------------------------------------
OperationalError                          Traceback (most recent call last)
~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    506     def do_execute(self, cursor, statement, parameters, context=None):
--> 507         cursor.execute(statement, parameters)
    508 

~/anaconda3/lib/python3.6/site-packages/pyathena/util.py in _wrapper(*args, **kwargs)
     28         with _lock:
---> 29             return wrapped(*args, **kwargs)
     30     return _wrapper

~/anaconda3/lib/python3.6/site-packages/pyathena/cursor.py in execute(self, operation, parameters)
    116         else:
--> 117             raise OperationalError(query_execution.state_change_reason)
    118 

OperationalError: Access denied when writing output to url: s3://commoncrawl/cc-index/table/cc-main/warc/3766a047-e63c-4531-b0f7-85a1d7035f9a.csv . Please ensure you are allowed to access the S3 bucket. If you are encrypting query results with KMS key, please ensure you are allowed to access your KMS key

The above exception was the direct cause of the following exception:

OperationalError                          Traceback (most recent call last)
<ipython-input-10-6ce5eec8364d> in <module>()
----> 1 import codecs, os, ast;__pyfile = codecs.open('''/tmp/py3278WRZ''', encoding='''utf-8''');__code = __pyfile.read().encode('''utf-8''');__pyfile.close();os.remove('''/tmp/py3278WRZ''');__block = ast.parse(__code, '''/home/dm/athna.py''', mode='exec');__last = __block.body[-1];__isexpr = isinstance(__last,ast.Expr);__block.body.pop() if __isexpr else None;exec(compile(__block, '''/home/dm/athna.py''', mode='exec'));eval(compile(ast.Expression(__last.value), '''/home/dm/athna.py''', mode='eval')) if __isexpr else None

~/athna.py in <module>()
     24     s3_staging_dir=quote_plus('s3://commoncrawl/cc-index/table/cc-main/warc/')))
     25 
---> 26 db = data(engine)
     27 
     28 db.peek()

~/anaconda3/lib/python3.6/site-packages/blaze/interactive.py in data(data_source, dshape, name, fields, schema, **kwargs)
    161         data_source = tuple(data_source)
    162     if not dshape:
--> 163         dshape = discover(data_source)
    164         types = None
    165         if isinstance(dshape.measure, Tuple) and fields:

~/anaconda3/lib/python3.6/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    162             self._cache[types] = func
    163         try:
--> 164             return func(*args, **kwargs)
    165 
    166         except MDNotImplementedError:

~/anaconda3/lib/python3.6/site-packages/odo/backends/sql.py in discover(engine)
    291 @dispatch(sa.engine.base.Engine)
    292 def discover(engine):
--> 293     return discover(metadata_of_engine(engine))
    294 
    295 

~/anaconda3/lib/python3.6/site-packages/multipledispatch/dispatcher.py in __call__(self, *args, **kwargs)
    162             self._cache[types] = func
    163         try:
--> 164             return func(*args, **kwargs)
    165 
    166         except MDNotImplementedError:

~/anaconda3/lib/python3.6/site-packages/odo/backends/sql.py in discover(metadata)
    297 def discover(metadata):
    298     try:
--> 299         metadata.reflect(views=metadata.bind.dialect.supports_views)
    300     except NotImplementedError:
    301         metadata.reflect()

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/sql/schema.py in reflect(self, bind, schema, views, only, extend_existing, autoload_replace, **dialect_kwargs)
   3924 
   3925             available = util.OrderedSet(
-> 3926                 bind.engine.table_names(schema, connection=conn))
   3927             if views:
   3928                 available.update(

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in table_names(self, schema, connection)
   2137             if not schema:
   2138                 schema = self.dialect.default_schema_name
-> 2139             return self.dialect.get_table_names(conn, schema)
   2140 
   2141     def has_table(self, table_name, schema=None):

<string> in get_table_names(self, connection, schema, **kw)

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/reflection.py in cache(fn, self, con, *args, **kw)
     40     info_cache = kw.get('info_cache', None)
     41     if info_cache is None:
---> 42         return fn(self, con, *args, **kw)
     43     key = (
     44         fn.__name__,

~/anaconda3/lib/python3.6/site-packages/pyathena/sqlalchemy_athena.py in get_table_names(self, connection, schema, **kw)
    122                 WHERE table_schema = '{schema}'
    123                 """.format(schema=schema)
--> 124         return [row.table_name for row in connection.execute(query).fetchall()]
    125 
    126     def has_table(self, connection, table_name, schema=None):

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in execute(self, object, *multiparams, **params)
    940         """
    941         if isinstance(object, util.string_types[0]):
--> 942             return self._execute_text(object, multiparams, params)
    943         try:
    944             meth = object._execute_on_connection

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_text(self, statement, multiparams, params)
   1102             statement,
   1103             parameters,
-> 1104             statement, parameters
   1105         )
   1106         if self._has_events or self.engine._has_events:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1198                 parameters,
   1199                 cursor,
-> 1200                 context)
   1201 
   1202         if self._has_events or self.engine._has_events:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception(self, e, statement, parameters, cursor, context)
   1411                 util.raise_from_cause(
   1412                     sqlalchemy_exception,
-> 1413                     exc_info
   1414                 )
   1415             else:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info)
    201     exc_type, exc_value, exc_tb = exc_info
    202     cause = exc_value if exc_value is not exception else None
--> 203     reraise(type(exception), exception, tb=exc_tb, cause=cause)
    204 
    205 if py3k:

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause)
    184             value.__cause__ = cause
    185         if value.__traceback__ is not tb:
--> 186             raise value.with_traceback(tb)
    187         raise value
    188 

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _execute_context(self, dialect, constructor, statement, parameters, *args)
   1191                         statement,
   1192                         parameters,
-> 1193                         context)
   1194         except BaseException as e:
   1195             self._handle_dbapi_exception(

~/anaconda3/lib/python3.6/site-packages/sqlalchemy/engine/default.py in do_execute(self, cursor, statement, parameters, context)
    505 
    506     def do_execute(self, cursor, statement, parameters, context=None):
--> 507         cursor.execute(statement, parameters)
    508 
    509     def do_execute_no_params(self, cursor, statement, context=None):

~/anaconda3/lib/python3.6/site-packages/pyathena/util.py in _wrapper(*args, **kwargs)
     27     def _wrapper(*args, **kwargs):
     28         with _lock:
---> 29             return wrapped(*args, **kwargs)
     30     return _wrapper
     31 

~/anaconda3/lib/python3.6/site-packages/pyathena/cursor.py in execute(self, operation, parameters)
    115                 self.retry_max_delay, self.retry_exponential_base)
    116         else:
--> 117             raise OperationalError(query_execution.state_change_reason)
    118 
    119     def executemany(self, operation, seq_of_parameters):

OperationalError: (pyathena.error.OperationalError) Access denied when writing output to url: s3://commoncrawl/cc-index/table/cc-main/warc/3766a047-e63c-4531-b0f7-85a1d7035f9a.csv . Please ensure you are allowed to access the S3 bucket. If you are encrypting query results with KMS key, please ensure you are allowed to access your KMS key [SQL: "\n                SELECT table_name\n                FROM information_schema.tables\n                WHERE table_schema = 'ccindex'\n                "] (Background on this error at: http://sqlalche.me/e/e3q8)

Handle InvalidRequestException errors raised

I have a script using Athena + SQLAlchemy to run a query and have the results of that query read in as a pandas DataFrame. However, for some queries (which are long), I get this error:

botocore.errorfactory.InvalidRequestException: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: Your query has exceeded the maximum query length of 262144 bytes. Please reduce the length of your query and try again.  If you continue to see this issue after reducing your query length,  contact customer support for further assistance.

But I can't do this

try:
   run_query()
except botocore.errorfactory.InvalidRequestException:
   run_query_differently()

because it says

AttributeError: module 'botocore.errorfactory' has no attribute 'InvalidRequestException'

The way it's usually handled is to use client.exceptions.InvalidRequestException but that requires access to the same client that was used to run the query.

Any ideas on how to to do this using the connections that PyAthena creates?

Cursor Skips First Row When Listing Partitions

Executing SHOW PARTITIONS {table_name} skips the the first row due to [this line of code]
(https://github.com/laughingman7743/PyAthena/blob/master/pyathena/cursor.py#L187).

The issue is really on AWS API for returning the column header as the first row in the data on SELECT queries. Ideally, the fix would be for them to stop returning the label as if it was part of the data set. It is not part of the data set, and they already return the column labels in the ResultSetMetadata.

One possible solution is to check for next token & if the first value is equal to the column label (or column name), skip that row (make offset = 1). I am not a fan of that, as it makes it a magic value, but it would be a decent short-term fix until Amazon fixes the API.

SQLAlchemy with temporary IAM Role credentials

I am using PyAthena + SQLAlchemy to connect to athena. It is being used with IAM Role credentials, which are temporary and expire (after 12 hours). After that time has passed, I get the following error: (pyathena.error.DatabaseError) An error occurred (ExpiredTokenException) when calling the StartQueryExecution operation: The security token included in the request is expired

I am passing in the following connection string:

    session = boto3.Session()
    credentials = session.get_credentials()
    connection_string = (
        f'awsathena+rest://{quote_plus(credentials.access_key)}:'
        f'{quote_plus(credentials.secret_key)}@athena.{region}.'
        f'amazonaws.com:445/{database}?s3_staging_dir='
        f'{quote_plus(s3_staging_dir)}'
        f';aws_session_token={quote_plus(credentials.token)}'
    )

It doesn't look like there's currently a way for the connection to fetch new credentials once they expire. I've solved this by fetching the credentials when they aren't provided:
master...leahein:fix-get-creds

New connection string:

  connection_string = (
        f'awsathena+rest://:@athena.{region}.'
        f'amazonaws.com:445/{database}?s3_staging_dir='
        f'{quote_plus(s3_staging_dir)}'
    )

Then, I configure sqlalchemy to recycle the connection before 12 hours, so we are guaranteed a new connection will be made before the credentials expire.

I'd be happy to submit a pull request for this, or hear whether there's any way to configure the connection so that temporary credentials don't eventually error out.

boto3 and botocore

This is a fantastic piece of work! One minor issue regarding using this in a Lambda function.

Packaging boto3 and botocore into a python lambda function is not a good idea due to the rate of change that the AWS API experiences. What happens is that eventually a long-running Lambda function will start failing in weird ways due to old boto and new API incompatibilities.

I'm trying to use your package in a lambda function, but the resulting Lambda package is (obviously) pulling in the boto stuff when I include your stuff.

Would it be too much to ask for you to remove the boto stuff from the install_requires portion, and either move it to extras_requires or simply document that you require it?

I'm happy to do a PR for this if you want me to, or fork and modify (I don't prefer this).

Your consideration is much appreciated!
-Joe

AttributeError: 'OperationalError' object has no attribute 'response'

[2017-06-02 04:53:21,643][PID:13][ERROR][pyathena.cursor] Failed to execute query.
Traceback (most recent call last):
  File "/home/xxx/.local/lib/python2.7/site-packages/pyathena/cursor.py", line 217, in execute
    **request)
  File "/home/xxx/.local/lib/python2.7/site-packages/pyathena/util.py", line 44, in retry_api_call
    return retry(func, *args, **kwargs)
  File "/home/xxx/.local/lib/python2.7/site-packages/tenacity/__init__.py", line 235, in call
    do = self.iter(result=result, exc_info=exc_info)
  File "/home/xxx/.local/lib/python2.7/site-packages/tenacity/__init__.py", line 185, in iter
    retry = self.retry(fut)
  File "/home/xxx/.local/lib/python2.7/site-packages/tenacity/retry.py", line 63, in __call__
    return self.predicate(attempt.exception())
  File "/home/xxx/.local/lib/python2.7/site-packages/pyathena/util.py", line 38, in <lambda>
    lambda e: e.response.get('Error', {}).get('Code', None) in exceptions),
AttributeError: 'OperationalError' object has no attribute 'response'

Using RLock makes it unsuitable for multithreading

Hi,
I've developed an application that uses ThreadPoolExecutor to execute queries to Athena in parallel to speed it up but the problem is that with a current implementation of the library it's not possible to issue requests to Athena in parallel. It's due to the fact that methods from cursor.py use @synchonized that uses RLock and blocks concurrent threads from making concurrent requests despite the fact that they have separate Cursor instances and in consequence boto3 sessions.
I think that it might be a good idea to consider supporting concurrent requests especially because the implementation seems to be thread-safe(as documented).
What's the reason for making the methods @synchronized in this case?

Type code contract broken

The DBAPI specifies that

The type_code must compare equal to one of Type Objects defined below

https://www.python.org/dev/peps/pep-0249/#type-objects-and-constructors

But I think there is a case problem here because the description in the cursor contains strings like integer but NUMBER is defined as:

 NUMBER = DBAPITypeObject('BOOLEAN', 'TINYINT', 'SMALLINT', 'BIGINT', 'INTEGER', 
                          'REAL', 'DOUBLE', 'FLOAT', 'DECIMAL', 'NUMERIC')   

so it doesn't compare equal to the value in the description.

'Engine' object has no attribute 'connection' when calling SQLAlchemy's `get_columns` with an `Engine`.

Hey @laughingman7743,

First of all, thanks a ton for putting together this nice Python wrapper for AWS Athena and also including a SQLAlchemy interface!

Sadly, when trying it out with Superset, I ran into the following issue:

  File "/root/venv/lib/python3.5/site-packages/pyathena/sqlalchemy_athena.py", line 187, in get_columns
    stop=stop_after_attempt(connection.connection.retry_attempt),
AttributeError: 'Engine' object has no attribute 'connection'

The trouble is that PyAthena expects a Connection in its get_columns interface (https://github.com/laughingman7743/PyAthena/blob/master/pyathena/sqlalchemy_athena.py#L157). In particular, it expects that connection.connection will exist, which sadly is not the case when connection is of the Engine type.

I am not familiar enough with SQLAlchemy to comment on what should be done differently, but this greatly hampers the usability of PyAthena with Superset. My current fix is to use PyAthenaJDBC (thank you for putting that together as well!), which is significantly slower.

Thus, I would like to see what can be done to fix this in PyAthena, as it otherwise works very well.

I am also happy to put together a PR with the fix -- I'd just like to know your thoughts on what approach would work best with the overall architecture.

To make testing simpler, I've also put together a minimal example which demonstrates this issue.

from urllib.parse import quote_plus  # PY2: from urllib import quote_plus
from sqlalchemy.engine import create_engine
import sqlalchemy

conn_str = 'awsathena+rest://{aws_access_key_id}:{aws_secret_access_key}@athena.{region_name}.amazonaws.com:443/'\
           '{schema_name}?s3_staging_dir={s3_staging_dir}'
engine = create_engine(conn_str.format(
    aws_access_key_id=quote_plus('AWS_ACCESS_KEY_ID'),
    aws_secret_access_key=quote_plus('AWS_SECRET_ACCESS_KEY'),
    region_name='eu-west-1',
    schema_name='default',
    s3_staging_dir=quote_plus('s3://some-bucket')))
inspector = sqlalchemy.inspect(engine)
print(inspector.get_columns('table_name', 'schema_name'))

Thanks a ton again!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.