Giter Site home page Giter Site logo

elasticsearch-py-async's Introduction

Python Elasticsearch Async Client

NOTE: This repository is deprecated, do not use for Elasticsearch 7.x or later.

See this issue about the deprecation for more information.

What is this library?

This is an adapter for elasticsearch-py providing a transport layer based on Python's asyncio module. All API calls now return a future wrapping the response.

Sniffing (when requested) is also done via a scheduled coroutine.

Example for python 3.5+

import asyncio
from elasticsearch_async import AsyncElasticsearch

client = AsyncElasticsearch(hosts=['localhost', 'other-host'])

async def print_info():
    info = await client.info()
    print(info)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_info())
loop.run_until_complete(client.transport.close())
loop.close()

Example for python 3.4

import asyncio
from elasticsearch_async import AsyncElasticsearch
hosts = ['localhost', 'other-host']

async def print_info():
    async with AsyncElasticsearch(hosts=hosts) as client:
          print(await client.info())

loop = asyncio.get_event_loop()
loop.run_until_complete(print_info())
loop.close()

Example with SSL Context

import asyncio
from elasticsearch_async import AsyncElasticsearch
from elasticsearch.connection.http_urllib3 import create_ssl_context

context = create_ssl_context(cafile="/certs/ca/ca.crt")

client = AsyncElasticsearch(
    hosts=['elasticsearch-xpack'],
    ssl_context=context,
    http_auth=('elastic', 'changeme')
)

@asyncio.coroutine
def print_info():
    info = yield from client.info()
    print(info)

loop = asyncio.get_event_loop()
loop.run_until_complete(print_info())
loop.run_until_complete(client.transport.close())
loop.close()

AsyncElasticsearch introduces one extra parameter loop which can be used to pass in an event loop you wish the client to use. By default asyncio.get_event_loop() will be used.

Installation

elasticsearch-async is available via PyPI so you can install it using pip

pip install elasticsearch-async

License

Copyright 2015 Elasticsearch

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

elasticsearch-py-async's People

Contributors

ciscorn avatar cooperlees avatar fxdgear avatar hanula avatar honzakral avatar hroncok avatar jvcop avatar jwtrhs avatar mosquito avatar sethmlarson avatar stickperson avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

elasticsearch-py-async's Issues

Asyncio.run Not Supported

I tried to use the Python 3.7 approach for asyncio via asyncio.run() and not loop.run_until_complete(). This produces the following error:

RuntimeError: Task <Task pending coro=<query_elasticsearch() running at <ipython-input-10-b627b3d0f9d9>:23> cb=[as_completed.<locals>._on_completion() at /miniconda3.7/lib/python3.7/asyncio/tasks.py:520]> got Future <Task pending coro=<AsyncTransport.main_loop() running at /lib/python3.7/site-packages/elasticsearch_async/transport.py:143>> attached to a different loop

If this is not a bug but just not supported yet, is there plans to include this? Thanks!

Incompatible with aiohttp>=3.6

Hello,
elasticsearch_async is no more compatible with aiohttp, since version 3.6 due to dropping of explicit parameter loop:

Traceback (most recent call last):
  ... 
    self.client = elasticsearch_async.AsyncElasticsearch(hosts=es_addresses)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch_async-6.2.0-py3.7.egg/elasticsearch_async/__init__.py", line 8, in __init__
    super().__init__(hosts, transport_class=transport_class, **kwargs)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch-7.5.1-py3.7.egg/elasticsearch/client/__init__.py", line 227, in __init__
    self.transport = transport_class(_normalize_hosts(hosts), **kwargs)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch_async-6.2.0-py3.7.egg/elasticsearch_async/transport.py", line 23, in __init__
    connection_pool_class=connection_pool_class, **kwargs)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch-7.5.1-py3.7.egg/elasticsearch/transport.py", line 129, in __init__
    self.set_connections(hosts)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch_async-6.2.0-py3.7.egg/elasticsearch_async/transport.py", line 56, in set_connections
    super().set_connections(hosts)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch-7.5.1-py3.7.egg/elasticsearch/transport.py", line 179, in set_connections
    connections = list(zip(connections, hosts))
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch-7.5.1-py3.7.egg/elasticsearch/transport.py", line 175, in _create_connection
    return self.connection_class(**kwargs)
  File "/home/ibt23sec5/projects/urpp/venv/lib/python3.7/site-packages/elasticsearch_async-6.2.0-py3.7.egg/elasticsearch_async/connection.py", line 73, in __init__
    ssl_context=ssl_context,
TypeError: __init__() got an unexpected keyword argument 'loop'

How can I use proxy to connect to elasticsearch ?

I am trying to connect elasticsearch cluster when I am behind a corporate proxy maybe something like

from elasticsearch import Elasticsearch, RequestsHttpConnection


class MyConnection(RequestsHttpConnection):
    def __init__(self, *args, **kwargs):
        proxies = kwargs.pop("proxies", {})
        super(MyConnection, self).__init__(*args, **kwargs)
        self.session.proxies = proxies


es = Elasticsearch(
    ["https://url_to_elasticsearch"],
    connection_class=MyConnection,
    proxies=proxies,
    use_ssl=True,
    http_auth=("username", "password"),
)

Non-async Elasticsearch kwargs sniffer_timeout should be dropped

Hello,

After switching from classic driver to async one I kept the same init kwargs:

self.conn = AsyncElasticsearch(
    hosts=self.hosts,
    # http://elasticsearch-py.readthedocs.io/en/master/api.html?highlight=search#elasticsearch
    sniff_on_start=True,
    sniff_on_connection_fail=True,
    sniffer_timeout=60
)

This setting gives the traceback below. Removing sniffer_timeout make it work again. As the sniffing is probably handled completely different in async driver, I suggest dropping it from calling super.

 File "/usr/lib/python3.5/asyncio/futures.py", line 380, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 304, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/lib/python3/dist-packages/elasticsearch_async2/transport.py", line 136, in main_loop
    connection = self.get_connection()
  File "/usr/lib/python3/dist-packages/elasticsearch_async2/transport.py", line 53, in get_connection
    self.initiate_sniff()
  File "/usr/lib/python3/dist-packages/elasticsearch_async2/transport.py", line 35, in initiate_sniff
    self.sniffing_task.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/lib/python3/dist-packages/elasticsearch_async2/transport.py", line 116, in sniff_hosts
    node_info = yield from self._get_sniff_data(initial)
  File "/usr/lib/python3/dist-packages/elasticsearch_async2/transport.py", line 86, in _get_sniff_data
    _, headers, node_info = t.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 293, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/lib/python3/dist-packages/elasticsearch_async2/connection.py", line 55, in perform_request
    response = yield from self.session.request(method, url, data=body)
  File "/usr/lib/python3/dist-packages/aiohttp/helpers.py", line 91, in __iter__
    ret = yield from self._coro
  File "/usr/lib/python3/dist-packages/aiohttp/client.py", line 168, in _request
    raise RuntimeError('Session is closed')
RuntimeError: Session is closed

Package status

Is there any plan to improve or merge this package into the standard Python package?

Connect elasticsearch with ipv6 address

When I connect my elsaticsearch with an ipv6 address like this:

ES_CLIENT = AsyncElasticsearch(
    hosts=['http://[2002:ac1f:91c8::127.0.0.1]:9200'],
    timeout=120,
    sniff_on_start=True,
    retry_on_timeout=True,
    max_retries=5,
    loop=self.loop
)

The connection will be failed because of the address will be converted to 2002:ac1f:91c8::127.0.0.1, and aiohttp can't establish the connection with that kind of address.

With this situation, I have prepared a pull request if you think this is a bug, or how can I connect my elasticsearch with IP address like that.

Issues with Python 3.5

Hi,

I am trying to run the example from the docs with Python 3.5 and the elasticsearch-py(5.0 I think) from the master branch and it does not seem to be working.

I keep getting:
AssertionError: yield from wasn't used with future

Not sure whats the problem. Detailed stack trace.

Traceback (most recent call last): File "/home/arti/workdir/man/services/imp/search/query/test.py", line 12, in <module> loop.run_until_complete(print_info()) File "/opt/anaconda3/envs/imp_search/lib/python3.5/asyncio/base_events.py", line 337, in run_until_complete return future.result() File "/opt/anaconda3/envs/imp_search/lib/python3.5/asyncio/futures.py", line 274, in result raise self._exception File "/opt/anaconda3/envs/imp_search/lib/python3.5/asyncio/tasks.py", line 239, in _step result = coro.send(None) File "/home/arti/workdir/man/services/imp/search/query/test.py", line 8, in print_info info = yield from client.info() File "/opt/anaconda3/envs/imp_search/lib/python3.5/site-packages/elasticsearch/client/utils.py", line 69, in _wrapped return func(*args, params=params, **kwargs) File "/opt/anaconda3/envs/imp_search/lib/python3.5/site-packages/elasticsearch/client/__init__.py", line 219, in info _, data = self.transport.perform_request('GET', '/', params=params) File "/opt/anaconda3/envs/imp_search/lib/python3.5/asyncio/futures.py", line 359, in __iter__ assert self.done(), "yield from wasn't used with future" AssertionError: yield from wasn't used with future Task was destroyed but it is pending! task: <Task pending coro=<TCPConnector._create_connection() running at /opt/anaconda3/envs/imp_search/lib/python3.5/site-packages/aiohttp/connector.py:558> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /opt/anaconda3/envs/imp_search/lib/python3.5/asyncio/tasks.py:344]> Task was destroyed but it is pending! task: <Task pending coro=<AsyncTransport.main_loop() running at /opt/anaconda3/envs/imp_search/lib/python3.5/site-packages/elasticsearch_async/transport.py:140> wait_for=<Future pending cb=[Task._wakeup()]>> Unclosed client session client_session: <aiohttp.client.ClientSession object at 0x7f208fb83240>

How to pass querystring params to search?

Hi,

I am trying to pass the following parameter to some of my queries (during tests, to ensure consistency of scoring): ?search_type=dfs_query_then_fetch (as instructed in https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html#request-body-search-search-type)

What's the best way to pass this along to the search method? I see params in there, but according to a section of the code in transport.py (https://github.com/elastic/elasticsearch-py-async/blob/master/elasticsearch_async/transport.py#L187-L197), it is not clear whether or not the content of params gets serialized in the querystring part for the url request, when a body is present.

Thank you in advance for any pointers!

Bulk Helpers

Is there a way to call an async version of the bulk helpers? I couldn't find away to do this when looking around.

RuntimeError: Task got bad yield: 200

I'm getting a Task got bad yield: 200 error while using the bulk method. I've tried this different ways (i.e. without my own generator function). I've also tried an unofficial Helpers class that was written and I get the same error.

Here is the stack trace:

File "/Users/brooks.isoldi/git/Futures/css/ingest/PythonElasticsearchIngest/venv/lib/python3.6/site-packages/elasticsearch_async/transport.py", line 150, in main_loop
    method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
RuntimeError: Task got bad yield: 200

Below is the relevant code:

from elasticsearch import RequestsHttpConnection
from elasticsearch_async import AsyncElasticsearch
from assume_role_aws4auth import AssumeRoleAWS4Auth

credentials = boto3.Session().get_credentials()
awsauth = AssumeRoleAWS4Auth(credentials, 'us-east-1', 'es')
event_loop = asyncio.get_event_loop()
es_client = AsyncElasticsearch(hosts=['https://MY-ES_HOST'], http_compress=True, http_auth=awsauth, use_ssl=True,
                               verify_certs=True, connection_class=RequestsHttpConnection, loop=event_loop)


def read_chunk(file_path: str, max_batch_size: int, max_records: int):
    actions: str = ''
    actions_size: int = 0
    num_actions: int = 0
    with gzip.open(file_path, 'rt') as f:
        for line in f:
            request = json.dumps(dict({'index': dict({})})) + '\n' + line + '\n'
            request_size = len(request.encode('utf-8'))

            # Check to see if this record will put us over the limits
            if (actions_size + request_size) > max_batch_size or num_actions == max_records:
                yield actions
                actions = ''
                num_actions = 0
                actions_size = 0

            # Add the record
            actions += request
            num_actions += 1
            actions_size += request_size

    if actions != '':
        yield actions


async def process(filename: str):
    for action_chunk in read_chunk(filename, 10000000, 500):
        try:
            resp = await es_client.bulk(body=action_chunk, index='logs', doc_type='doc', _source=False)
            logger.error(resp)
        except Exception as ex:
            logger.error('Found an exception')
            logger.error(''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)))
        await asyncio.sleep(.1)


event_loop.run_until_complete(process(filename))
pending = asyncio.Task.all_tasks()
event_loop.run_until_complete(asyncio.gather(*pending))

Any thoughts?

Proper way to create the client: inside each method or one for the app?

I can’t find any documentation on this and it’s hard to tell from the code sample.

Is it preferred to create the ElasticSearch client on the app level and re-use for the duration of the app, or is it better to create a new client for every method? I am using this on a webapp that makes frequent ES requests.

Thanks!

ERROR: Creating a client session outside of coroutine

I'm using elasticsearch-py-async in a web application. The client gets instantiated during startup and is shared across requests. The whole startup process is synchronous, hence, there is no loop involved. As a result I get an "ERROR: Creating a client session outside of coroutine".

My understanding is, that during the creation of the client no connection / session should be opened. How am I supposed to do this correctly?

Import failed

I tried to import:

from elasticsearch_async import AsyncElasticsearch

and it gives this error:

from aiohttp.client_exceptions import ServerFingerprintMismatch
ModuleNotFoundError: No module named 'aiohttp.client_exceptions'

Python 3.7.0b4 async keyword incompatibility

import: 'elasticsearch_async'
Traceback (most recent call last):
  File "/opt/conda/conda-bld/elasticsearch-async_1525571278323/test_tmp/run_test.py", line 2, in <module>
    import elasticsearch_async
  File "/opt/conda/conda-bld/elasticsearch-async_1525571278323/_test_env_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_/lib/python3.7/site-packages/elasticsearch_async/__init__.py", line 1, in <module>
    from .transport import AsyncTransport
  File "/opt/conda/conda-bld/elasticsearch-async_1525571278323/_test_env_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_/lib/python3.7/site-packages/elasticsearch_async/transport.py", line 9, in <module>
    from .helpers import ensure_future
  File "/opt/conda/conda-bld/elasticsearch-async_1525571278323/_test_env_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_placehold_/lib/python3.7/site-packages/elasticsearch_async/helpers.py", line 3
    ensure_future = getattr(asyncio, 'ensure_future', asyncio.async)
                                                                  ^

Add a note or constraint for elasticsearch 5 compatibility

This library can partially be used with elasticsearch-py >= 5.0.0, but it breaks during error handling. There are different method signatures for log_request_fail

elasticsearch_async/connection.py:

self.log_request_fail(method, url, body, self.loop.time() - start, exception=e)

versus /elasticsearch/connection/base.py:

log_request_fail(self, method, full_url, path, body, duration, status_code=None, response=None, exception=None):

Note the newly added 'path' argument.

You may want to update the setup.py (elasticsearch <= 5.0.0) or leave a note.

Btw. this library is great work!

Release a ES6.0 compatible version

Hi there,

We are moving to ES6.0 and would very much like to keep using elasticsearch-py-async. Since support for the new ES version is there thanks to #29, do you think it would be possible to release a new version that includes this changes?

Thanks a lot!

TypeError: '_asyncio.Task' object is not subscriptable

I want to index the documents with command: ./manage.py search_index --rebuild
the errors as follows:
Traceback (most recent call last):
File "./manage.py", line 15, in
execute_from_command_line(sys.argv)
File "E:\codes\xtsearch\venv\lib\site-packages\django\ core\management_init_.py", line 371, in execute_from_ command_line
utility.execute()
File "E:\codes\xtsearch\venv\lib\site-packages\django\ core\management_init_.py", line 365, in execute
self.fetch_command(subcommand).run_from_argv(self.ar gv)
File "E:\codes\xtsearch\venv\lib\site-packages\django\ core\management\base.py", line 288, in run_from_argv
self.execute(*args, **cmd_options)
File "E:\codes\xtsearch\venv\lib\site-packages\django\ core\management\base.py", line 335, in execute
output = self.handle(*args, **options)
File "D:\es_test\xtsearch\django_elasticsearch_dsl\man agement\commands\search_index.py", line 134, in handle
self._rebuild(models, options)
File "D:\es_test\xtsearch\django_elasticsearch_dsl\man agement\commands\search_index.py", line 115, in _rebuild
self._populate(models, options)
File "D:\es_test\xtsearch\django_elasticsearch_dsl\man agement\commands\search_index.py", line 92, in _populate
doc().update(qs)
File "D:\es_test\xtsearch\django_elasticsearch_dsl\doc uments.py", line 231, in update
self._get_actions(object_list, action), **kwargs
File "D:\es_test\xtsearch\django_elasticsearch_dsl\doc uments.py", line 191, in bulk
return bulk(client=self.connection, actions=actions, **kwargs)
File "D:\es_test\xtsearch\elasticsearch\helpers__init __.py", line 257, in bulk
for ok, item in streaming_bulk(client, actions, **kw args):
File "D:\es_test\xtsearch\elasticsearch\helpers__init __.py", line 192, in streaming_bulk
raise_on_error, **kwargs)
File "D:\es_test\xtsearch\elasticsearch\helpers__init __.py", line 123, in _process_bulk_chunk
for data, (op_type, item) in zip(bulk_data, map(meth odcaller('popitem'), resp['items'])):
TypeError: '_asyncio.Task' object is not subscriptable

5.2.0 release fluke?

I've spotted that 5.2.0 versoin at PyPI ~week ago was built from nick/ssl_context branch (at least, content of connections.py). More funny thing – this is cached in travis somehow, so building projects with es-async requirements ends up with code different from tar/whl in PyPI.

  1. Is this known thing?
  2. Is there any plans to change versioning somehow? As far as I know, idea was to follow elastic products versioning in python libs. Does anyone have ideas how to protect dependent projects from some accident PyPI builds/publishes?

aiohttp 2.0 incompatibility

Looks like elasticsearch-py-async is incompatible with the 2.0 version of aiohttp.
Reason: recently I started getting the following error:-

  File ".../lib/python3.5/site-packages/elasticsearch_async/__init__.py", line 1, in <module>
    from .transport import AsyncTransport
  File ".../lib/python3.5/site-packages/elasticsearch_async/transport.py", line 8, in <module>
    from .connection import AIOHttpConnection
  File ".../lib/python3.5/site-packages/elasticsearch_async/connection.py", line 4, in <module>
    from aiohttp.errors import FingerprintMismatch, ClientError
ImportError: No module named 'aiohttp.errors'

Also on checking the master branch I couldn't find a module by the name aiohttp.errors however I found it on the 1.3 branch not to mention that forcing the aiohttp version down to 1.3 also got rid of the error for me.
So wanted to suggest to fix the aiohttp version to >1.3,<2.0 till elasticsearch_async is migrated to use aiohttp 2.0

PS: Still in my early stages of python so feel free to correct me at any point

ping() raises instead of returning False

The ping() method should not crash in the async version of the Elasticsearch client, and behave like the synchronous version of the client. This async version returns True properly, but raises instead of returning False.

Expected behavior

It should return a boolean like the synchronous version:

$ python3
Python 3.8.2 (default, Feb 25 2020, 13:04:52) 
[GCC 9.2.1 20200224] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import elasticsearch
>>> es = elasticsearch.Elasticsearch("127.0.0.1:9")
>>> es.ping()
False
>>>

Actual behavior

The async version raises instead of returning False:

$ python3 -m asyncio
asyncio REPL 3.8.2 (default, Feb 25 2020, 13:04:52) 
[GCC 9.2.1 20200224] on linux
Use "await" directly instead of "asyncio.run()".
Type "help", "copyright", "credits" or "license" for more information.
>>> import asyncio
>>> import elasticsearch_async
>>> es = elasticsearch_async.AsyncElasticsearch("127.0.0.1:9")
>>> await es.ping()
Traceback (most recent call last):
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/connector.py", line 936, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore  # noqa
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1025, in create_connection
    raise exceptions[0]
  File "/usr/lib/python3.8/asyncio/base_events.py", line 1010, in create_connection
    sock = await self._connect_sock(
  File "/usr/lib/python3.8/asyncio/base_events.py", line 924, in _connect_sock
    await self.sock_connect(sock, address)
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 494, in sock_connect
    return await fut
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 526, in _sock_connect_cb
    raise OSError(err, f'Connect call failed {address}')
ConnectionRefusedError: [Errno 111] Connect call failed ('127.0.0.1', 9)

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/home/venv/lib/python3.8/site-packages/elasticsearch_async/connection.py", line 98, in perform_request
    response = yield from self.session.request(method, url, data=body, headers=headers)
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/client.py", line 480, in _request
    conn = await self._connector.connect(
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/connector.py", line 523, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/connector.py", line 858, in _create_connection
    _, proto = await self._create_direct_connection(
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/connector.py", line 1004, in _create_direct_connection
    raise last_exc
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/connector.py", line 980, in _create_direct_connection
    transp, proto = await self._wrap_create_connection(
  File "/home/home/venv/lib/python3.8/site-packages/aiohttp/connector.py", line 943, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host 127.0.0.1:9 ssl:default [Connect call failed ('127.0.0.1', 9)]

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "<console>", line 1, in <module>
  File "/home/home/venv/lib/python3.8/site-packages/elasticsearch_async/transport.py", line 149, in main_loop
    status, headers, data = yield from connection.perform_request(
  File "/home/home/venv/lib/python3.8/site-packages/elasticsearch_async/connection.py", line 111, in perform_request
    raise ConnectionError('N/A', str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(Cannot connect to host 127.0.0.1:9 ssl:default [Connect call failed ('127.0.0.1', 9)]) caused by: ClientConnectorError(Cannot connect to host 127.0.0.1:9 ssl:default [Connect call failed ('127.0.0.1', 9)])
>>>

Environment

Nothing is listening on port 9.

Installed packages:

$ pipdeptree
elasticsearch-async==6.2.0
  - aiohttp [required: Any, installed: 3.6.2]
    - async-timeout [required: >=3.0,<4.0, installed: 3.0.1]
    - attrs [required: >=17.3.0, installed: 19.3.0]
    - chardet [required: >=2.0,<4.0, installed: 3.0.4]
    - multidict [required: >=4.5,<5.0, installed: 4.7.5]
    - yarl [required: >=1.0,<2.0, installed: 1.4.2]
      - idna [required: >=2.0, installed: 2.9]
      - multidict [required: >=4.0, installed: 4.7.5]
  - async-timeout [required: Any, installed: 3.0.1]
  - elasticsearch [required: >=6.0.0, installed: 7.5.1]
    - urllib3 [required: >=1.21.1, installed: 1.25.8]
pipdeptree==0.13.2
  - pip [required: >=6.0.0, installed: 18.1]
pkg-resources==0.0.0
setuptools==44.0.0
wheel==0.34.2

Is this package maintained?

Hey guys,
thanks for your help with these package!
I would like to know if this package is maintained , and if not, what are the other options for using elasticsearch and asyncio

Thank you guys in advance.

ServerDisconnectedError

When trying to save many documents to an index I'm running into the occasional disconnect for some reason:

elasticsearch.exceptions.ConnectionError: ConnectionError() caused by: ServerDisconnectedError()

Any way we to remedy this? Not sure why it happens, but setting up a retry on error may help mitigate this error.

Why schedule Task instead of just returning Future?

First, thanks for making this.

Trying to await an indexing operation transport.py ensure_future()s the request so it's attached to its own loop. So the task cannot be started in my loop. Why not just return the future without ensure/create_task-ing it?

Elasticsearch 2.x support?

master branch only supports Elasticsearch>=5.0.0, but some people still need to use 2.x and want to use elasticsearch-py-async with it.
Can we have a separate enhancements and bugfixes branch for 2.x like elasticsearch-py does?

I've created 2.x branch to which I'd like to backport #12.

What do you think? If possible, how to proceed with it?
Thanks

latest aiohttp support for 5.x branch

Issue #37 was fixed for elasticsearch-py-async versions 6.x and above.
Version 6.0.0 depends on elasticsearch 6.0.0, with has compatability with ElasticSearch 6.x

In real world, Elasticsearch 5.x is widely installed, which means elasticsearch-py-async 5.x is needed to talk to ES 5.x cluster.

Using 5.x causes the same issue as #37 with latest aiohttp.

Is there a way to get the Timeout to async_timeout change implemented for 5.x branch?
In essence, similar to how https://github.com/elastic/elasticsearch-py package is available for different version of ElasticSearch, similarly, the async version of the client should be available.

I am willing to contribute.

ANNOUNCE: Asyncio is now supported natively in the 'Elasticsearch' package

Starting in version 7.8.0 of the elasticsearch package Asyncio is now supported natively with async/await on Python 3.6+.
Due to this, this library is now deprecated and will not be adding new features or receiving updates beyond impactful bug fixes for Elasticsearch 6.x.

You can install async support via:

$ python -m pip install elasticsearch[async]

NOTE: If you're still targetting Elasticsearch 6.x you should continue to use this library until you've upgraded to Elasticsearch 7.x or later

Exception when using explicit loop

When client constructed with custom loop:

client = AsyncElasticsearch(hosts=hosts, loop=loop)
await client.get(...whatever...)

exception is raised:

Traceback (most recent call last):
  File "/store/code/hce/venv/lib/python3.5/site-packages/elasticsearch_async/connection.py", line 54, in perform_request
    with aiohttp.Timeout(timeout or self.timeout):
  File "/store/code/hce/venv/lib/python3.5/site-packages/async_timeout/__init__.py", line 24, in __init__
    loop = asyncio.get_event_loop()
  File "/usr/lib/python3.5/asyncio/events.py", line 632, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/usr/lib/python3.5/asyncio/events.py", line 578, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'MainThread'.

Problems with CA_CERTS

I need to use certs with my ES connection. In the code it appears as though ca_certs is accepted in the constructor but is unused. Is that correct? Is this a bug or am I not correctly using ca_certs?

Twisted Backend

Thank you for your work on this async adapter. Really great.

I would like to implement a Twisted backend. This is prompted by issue 458 from the elasticsearch-py repository.

Should I submit a PR against this repository, or would a separate repository be better (i.e. elasticsearch-py-twisted)? If the former, then I would create two new directories elasticsearch-async/twisted and test_elasticsearch_async/twisted. (Some code duplication might crop up, but a subsequent PR could take care of refactoring.)

Drop Python 3.4 support

Can we drop Python 3.4 support from this library? It has reached end-of-life and is no longer maintained by the Python Software Foundation. See https://www.python.org/dev/peps/pep-0429/#release-schedule for the 3.4 EOL notice.

Dropping 3.4 would allow us to migrate everything into a modern async def syntax and use async generators. Async generators would allows this library to support scroll/scan API as well as Bulk api's easier. I've got a fork which implements most of those helpers as async versions already; and it was trivial to do with async generator support.

@honzakral @fxdgear

Unclosed connector

Hi,

I am getting an error message when using the client.

Error message

Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7fb608150c88>, 170770.66)]']
connector: <aiohttp.connector.TCPConnector object at 0x7fb608128588>

My code:

elastic_client = AsyncElasticsearch([f'{ES_USER}:{ES_PASSWORD}@{ES_HOST}'], loop=eloop)
await elastic_client.index(index=es_index(doc_type), doc_type=doc_type, body=activity)
elastic_client.transport.close()

This is called in an aiohttp.web.Application middleware. The loop is created when the app is created using asyncio.get_event_loop().

The request still succeeds, but the message is noisy.

I have tried with and without the line elastic_client.transport.close() to no avail. And sometimes (sporadically) with it there I get the error message:

/usr/local/lib/python3.6/site-packages/elasticsearch/connection_pool.py:255: RuntimeWarning: coroutine 'ClientSession.close' was never awaited

However await the same line throws a different error.

Versions:

python ==3.6.4
elasticsearch-async ==6.0.0
aiohttp ==3.1.3

Am I doing something wrong, or is this a bug?

Thanks!

Is this repository maintained by elastic at all?

Has elastic abandoned this repository?

There are a bunch of MRs up with no replies what so ever for over a year.

You can't expect anyone to make an effort to contribute if they can't even expect anything to be upstreamed.

NameError: name 'warnings' is not defined in

File connection.py doesn't contain import warnings but it should.
Please add as it is using twice on lines :44 and :57

  File "/usr/local/lib/python3.6/site-packages/elasticsearch_async/connection.py", line 44, in init
    warnings.warn('Use of `verify_certs`, `ca_certs` have been deprecated in favor of using SSLContext`', DeprecationWarning)
NameError: name 'warnings' is not defined

DNS cache in `aiohttp.TCPConnector` causes client requests to timeout when host address changes

Hi!

Using domain-based elasticsearch hosts that change their IP over time can cause AsyncElasticsearch client to timeout requests as the default DNS cache resolves the hostname with old address.

For example, with Elasticsearch Heroku Add-ons that run on AWS, the host URL doesn't change and looks like https://abcdef-123455.eu-west-1.bonsaisearch.net, but the IP address can change over time. Because of the DNS cache - which isn't flushed - the client tries to send requests to wrong host IP and failes.

Solution might be to pass explicit use_dns_cache param to aiohttp.TCPConnector in the constructor of AIOHttpConnection.
Additionally, because of the default DNS resolver being thread-based, a dns_resolver param could be used to specify own, for example aiodns-based AsyncResolver, although it was removed as default one because of compatibility issues with IPv6 systems.

I can submit a PR with this solution soon.
What do you think?

Support for Elasticsearch 6.x

Elastic 6.x doesn't seem to be supported. For instance, when trying to create document, 406 response is sent back by ES server.

Also, diagnostics is poor. elasticsearch.exceptions.TransportError: <exception str() failed> doesn't tell much.

>>> loop.run_until_complete(client.create(index='qpa', doc_type='qpa', id=42, body={'data': 'str'}))
PUT http://172.99.0.10:9200/qpa/qpa/42/_create [status:406 request:0.010s]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.6/site-packages/elasticsearch_async/transport.py", line 140, in main_loop
    method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/local/lib/python3.6/site-packages/elasticsearch_async/connection.py", line 74, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/local/lib/python3.6/site-packages/elasticsearch/connection/base.py", line 125, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.TransportError: <exception str() failed>

Exceptions on sniffing error

After 2d82b1b a next call to ElasticSearch server may raise an error if the last sniffing was failed.

As I see elasticsearch-py suppresses exceptions like ConnectionError but elasticsearch-py-async raises all errors (connections, timeouts, cancellations etc).
elasticsearch-py raises TransportError only as I see.

Another problem is when the library reports about problems: elasticsearch-py raises immediatelly but elasticsearch-py-async will report about problem in sniffing in the past.

I did not dig into the code too deep but maybe moving sniffing from synchronous get_connection into async perform_request makes sense?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.