Giter Site home page Giter Site logo

aiojobs's Introduction

aiojobs

image

image

image

Documentation Status

Chat on Gitter

Job scheduler for managing background tasks (asyncio)

The library gives a controlled way for scheduling background tasks for asyncio applications.

Installation

$ pip3 install aiojobs

Usage example

import asyncio
import aiojobs

async def coro(timeout):
    await asyncio.sleep(timeout)

async def main():
    scheduler = aiojobs.Scheduler()
    for i in range(100):
        # spawn jobs
        await scheduler.spawn(coro(i/10))

    await asyncio.sleep(5.0)
    # not all scheduled jobs are finished at the moment

    # gracefully close spawned jobs
    await scheduler.close()

asyncio.get_event_loop().run_until_complete(main())

Integration with aiohttp.web

from aiohttp import web
from aiojobs.aiohttp import setup, spawn

async def handler(request):
    await spawn(request, coro())
    return web.Response()

app = web.Application()
app.router.add_get('/', handler)
setup(app)

or just

from aiojobs.aiohttp import atomic

@atomic
async def handler(request):
    return web.Response()

For more information read documentation: https://aiojobs.readthedocs.io

Communication channels

aio-libs google group: https://groups.google.com/forum/#!forum/aio-libs

Feel free to post your questions and ideas here.

Gitter Chat https://gitter.im/aio-libs/Lobby

We support Stack Overflow. Please add python-asyncio or aiohttp tag to your question there.

Author and License

The aiojobs package is written by Andrew Svetlov.

It's Apache 2 licensed and freely available.

aiojobs's People

Contributors

asvetlov avatar dependabot-preview[bot] avatar dependabot[bot] avatar dreamsorcerer avatar ebeseda avatar fak3 avatar grasslasts avatar ikornaselur avatar imgbot[bot] avatar jchacking avatar jettify avatar kxepal avatar lgtm-com[bot] avatar livercat avatar mariatta avatar mattrasband avatar oleynikandrey avatar pollydrag avatar pre-commit-ci[bot] avatar pyup-bot avatar shadchin avatar st4lk avatar thijstriemstra avatar webknjaz avatar zoicsoftware avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiojobs's Issues

@atomic not applicable for the Class Based Views

File "/home/max/work/hc_market/env/lib/python3.6/site-packages/aiohttp/web_urldispatcher.py", line 753, in __await__
    return (yield from self.__iter__())
File "/home/max/work/hc_market/env/lib/python3.6/site-packages/aiohttp/web_urldispatcher.py", line 748, in __iter__
    resp = yield from method()
File "/home/max/work/hc_market/env/lib/python3.6/site-packages/aiojobs/aiohttp.py", line 28, in wrapper
    job = await spawn(request, coro(request))
File "/home/max/work/hc_market/env/lib/python3.6/site-packages/aiojobs/aiohttp.py", line 22, in spawn
    return await get_scheduler(request).spawn(coro)
File "/home/max/work/hc_market/env/lib/python3.6/site-packages/aiojobs/aiohttp.py", line 10, in get_scheduler
    scheduler = get_scheduler_from_app(request.app)
AttributeError: 'Test' object has no attribute 'app'

Scheduler.close() takes a lot of time to finish

I spawn many thousands of jobs to a scheduler, and after I hit ctrl-c, call Scheduler.close() to stop all the jobs. It takes ages to finish, because currently the job _close() method attempts to start its task to prevent RuntimeWarning:

    async def _close(self, timeout):
        self._closed = True
        if self._task is None:
            # the task is closed immediately without actual execution
            # it prevents a warning like RuntimeWarning: coroutine 'coro' was never awaited
            self._start()

I believe this behaivor can be optional, as I would much prefer to stop the scheduler asap, and don't care about coroutine which is never awaited.

Why aiojobs is depended on aiohttp?

This seems happened during migration to flit at 78cd035

While aiojobs provides some handy integration with aiohttp, it seems strange that it depends on it. What if we would like to use aiojobs without aiohttp? Is it still a case?

Fixed time scheduler

If you wanna improve your scheduler, add feature with fixed time.

schedule.every(10).minutes.do(job, *args, **kwargs)
schedule.every(2).hours.do(job, *args, **kwargs)
schedule.every().day.at("10:30").do(job, *args, **kwargs)
schedule.every().monday.do(job, *args, **kwargs)
schedule.every().wednesday.at("13:15").do(job, *args, **kwargs)

job.wait returns None when already closed instead of real result

import asyncio
import aiojobs


async def func():
    return 123


async def probe():
    scheduler = await aiojobs.create_scheduler()

    job_list = []
    for i in range(10):
        job = await scheduler.spawn(func())
        job_list.append(job)

    res_list = []
    for job in job_list:
        res = await job.wait()
        res_list.append(res)

    print(res_list)

    await scheduler.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(probe())

Expected it prints:

[123, 123, 123, 123, 123, 123, 123, 123, 123, 123]

But It prints:

[123, None, None, None, None, None, None, None, None, None]

aiojobs-0.2.2

aiojobs has no function to force cancel the task

I faced a problem with the closing job without finishing Task. I'm running an endless "while True" loop inside of the job and I need to kill/cancel/close it and create a new one with fresh data by request. Any thoughts?

Deprecation warnings in asyncio with Python 3.8

When using aiojobs with Python 3.8 with warnings enabled, deprecation warnings are emitted at various places where a loop argument is passed to asyncio APIs. This is because explicit passing of that argument was deprecated and will be removed in Python 3.10. For some of the warnings, see for example this Travis CI build.

It would be good to fix these warnings. Looking at the code, it seems like the most sensible course of action would be to stop passing the loop argument to Job and Scheduler and just switch them to asyncio.get_running_loop() (with a asyncio.get_event_loop() fallback for Python < 3.7).

Cancel on waiting jobs should be protected

Example:

from aiohttp import web
from aiojobs.aiohttp import setup, spawn

async def handler(request):
    await spawn(request, coro())
    return web.Response()

app = web.Application()
app.router.add_get('/', handler)
setup(app)

Cancellation of await spawn(request, coro()) by https://github.com/aio-libs/aiohttp/pull/2257/files should be covered by asyncio,shield or custom wrapper

As well aiohttp docs for now can use combination of asyncio.shield and spawn to illuminate unexpected behavior

Usage example in README.md is broken

In README.md we have following Usage example section:

import asyncio
import aiojobs

async def coro(timeout):
    await asyncio.sleep(timeout)

async def main():
    scheduler = aiojobs.create_scheduler()
    for i in range(100):
        # spawn jobs
        await scheduler.spawn(coro(i/10))

    await asyncio.sleep(5.0)
    # not all scheduled jobs are finished at the moment

    # gracefully close spawned jobs
    await scheduler.close()

asyncio.get_event_loop().run_until_complete(main())

When I run it using python interpreter (i have Python 3.6.1 version), i have following error:

Traceback (most recent call last):
  File "test_background.py", line 50, in <module>
    asyncio.get_event_loop().run_until_complete(main())
  File "/Users/dubovoy/anaconda/lib/python3.6/asyncio/base_events.py", line 466, in run_until_complete
    return future.result()
  File "test_background.py", line 42, in main
    await scheduler.spawn(coro(i/10))
AttributeError: 'coroutine' object has no attribute 'spawn'

TypeError: timeout() got an unexpected keyword argument 'timeout'

aiojobs==0.3.0
async-timeout==4.0.0

Traceback (most recent call last):
  File "/home/user/.pyenv/versions/project/lib/python3.9/site-packages/aiojobs/_job.py", line 58, in wait
    return await asyncio.shield(self._do_wait(timeout),
  File "/home/user/.pyenv/versions/project/lib/python3.9/site-packages/aiojobs/_job.py", line 47, in _do_wait
    with async_timeout.timeout(timeout=timeout, loop=self._loop):
TypeError: timeout() got an unexpected keyword argument 'timeout'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/lang/projects/slot.py", line 254, in _calc_tk_inner
    res = await job.wait(timeout=tk.timeout)
  File "/home/user/.pyenv/versions/project/lib/python3.9/site-packages/aiojobs/_job.py", line 64, in wait
    await self._close(scheduler.close_timeout)
  File "/home/user/.pyenv/versions/project/lib/python3.9/site-packages/aiojobs/_job.py", line 87, in _close
    with async_timeout.timeout(timeout=timeout,
TypeError: timeout() got an unexpected keyword argument 'timeout'

aiojobs are cancelled (concurrent.futures._base.CancelledError)

When I use aiohttp.ClientSession resquest method to make a post request, it encounters CancelledErrors and I read the issue in https://github.com/aio-libs/aiohttp/issues/2056
I change the original code

async with self.session() as session:
    async with session.request(
        method,
        url,
        headers=headers,
        params=self.combine_parameters(kwargs),
        **kwargs
    ) as resp:
        if resp.status >= 400:
            raise ClientResponseError(
                resp.status, resp.reason, await resp.content.read()
            )
        return await getattr(resp, return_method)()

to

async def my_asyn_job(session,method,url,headers,params,**kwargs):
    async with session.request(
            method,
            url,
            headers=headers,
            params=params,
            **kwargs
    ) as resp:
            if resp.status >= 400:
                raise ClientResponseError(
                    resp.status, resp.reason, await resp.content.read()
                )
            return await getattr(resp, return_method)()

async with self.session() as session:
    job = await scheduler.spawn(my_asyn_job(session,method, url,headers=headers,params=self.combine_parameters(kwargs),**kwargs))
    resp = await job.wait(timeout=300.0)
    await scheduler.close()
    return resp

However,I still encounter concurrent.futures._base.CancelledError(Read the log from bottom to top)

2019-12-28T18:46:45.692202891+08:00 concurrent.futures._base.CancelledError
2019-12-28T18:46:45.692196304+08:00 await self._waiter
2019-12-28T18:46:45.692193847+08:00 File "/usr/local/lib/python3.6/site-packages/aiohttp/streams.py", line 588, in read
2019-12-28T18:46:45.692191606+08:00 return self.gen.throw(type, value, traceback)
2019-12-28T18:46:45.692189024+08:00 File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 129, in throw
2019-12-28T18:46:45.692186616+08:00 message, payload = await self._protocol.read() # type: ignore # noqa
2019-12-28T18:46:45.692183604+08:00 File "/usr/local/lib/python3.6/site-packages/aiohttp/client_reqrep.py", line 844, in start
2019-12-28T18:46:45.692181182+08:00 return self.gen.throw(type, value, traceback)
2019-12-28T18:46:45.692178663+08:00 File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 129, in throw
2019-12-28T18:46:45.69217598+08:00 await resp.start(conn)
2019-12-28T18:46:45.692173366+08:00 File "/usr/local/lib/python3.6/site-packages/aiohttp/client.py", line 497, in _request
2019-12-28T18:46:45.692170814+08:00 return self.gen.throw(type, value, traceback)
2019-12-28T18:46:45.692163395+08:00 File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 129, in throw
2019-12-28T18:46:45.692160872+08:00 self._resp = await self._coro
2019-12-28T18:46:45.692158225+08:00 File "/usr/local/lib/python3.6/site-packages/aiohttp/client.py", line 1005, in __aenter__
2019-12-28T18:46:45.692155291+08:00 return self.gen.throw(type, value, traceback)
2019-12-28T18:46:45.692151973+08:00 File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 129, in throw
2019-12-28T18:46:45.69214951+08:00 **kwargs
2019-12-28T18:46:45.692146847+08:00 File "/data/code/rasa-1.2.9/rasa/utils/endpoints.py", line 144, in request
2019-12-28T18:46:45.692144459+08:00 return self.gen.throw(type, value, traceback)
2019-12-28T18:46:45.692141778+08:00 File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 129, in throw
2019-12-28T18:46:45.692139103+08:00 json=json_body, method="post", timeout=DEFAULT_REQUEST_TIMEOUT
2019-12-28T18:46:45.692136388+08:00 File "/data/code/rasa-1.2.9/rasa/core/actions/action.py", line 399, in run
2019-12-28T18:46:45.692133807+08:00 return self.gen.throw(type, value, traceback)
2019-12-28T18:46:45.692129992+08:00 File "/usr/local/lib/python3.6/asyncio/coroutines.py", line 129, in throw
2019-12-28T18:46:45.692127126+08:00 events = await action.run(output_channel, nlg, tracker, self.domain)
2019-12-28T18:46:45.692124325+08:00 File "/data/code/rasa-1.2.9/rasa/core/processor.py", line 446, in _run_action
2019-12-28T18:46:45.692121434+08:00 Traceback (most recent call last):

I'm new to aiohttp and I can't quite understand the reason for this mistake。My two servers will interact with each other through a proxy server. Maybe the proxy server times out and then disconnects。I think my mistake is very similar to issue mentioned above.

aiohttp should be optional

the dependency to aiohttp should be optional.

Currently pip install aiojobs will also install aiohttp.

Calling .wait() or .close() on failed job (at .closed state) causes an exception

Hi,

thanks for your library!
Trying to figure out how to forward exceptions from spawned jobs into main code and come against non correct behavior. If job was failed (raised and exception inside) and its state was set to closed and job.wait() called on it, the exception raised:

  File "/usr/local/lib/python3.6/site-packages/aiohttp/web_protocol.py", line 381, in start
    resp = await self._request_handler(request)
  File "/usr/local/lib/python3.6/site-packages/aiohttp/web_app.py", line 310, in _handle
    resp = await handler(request)
  File "test.py", line 76, in check
    print(await job.wait())
  File "/usr/local/lib/python3.6/site-packages/aiojobs/_job.py", line 62, in wait
    await self._close(scheduler.close_timeout)
AttributeError: 'NoneType' object has no attribute 'close_timeout'

It seems there should be either different behavior like idempotent jobs waiting, or maybe simply forbid to wait closed jobs.

Graceful shutdowns via Scheduler.wait()

I'm writing an app that requires a few background jobs to fetch requests from a queue and process them atomically.

When the main app receives a shutdown signal, I'm firing an asyncio.Event to let the jobs know they need to close after they finish processing the task, and I then Job.wait() each of them.

It would be more convenient to do this via the Scheduler class. Currently it only has support for close()ing all the jobs, but not for wait()ing them. Does it make sense to add such a function? A simplified version would look like this:

class Scheduler:
    async def wait(self):
        await asyncio.gather(*[job.wait() for job in jobs])

Plans to integrate thread pool executor/process pool executor (potentially aioprocessing?)

I think aiojobs can be quite a useful abstraction for any usage of asyncio for long-running tasks (or tasks that live longer than a single request-response lifetime when used with aiohttp).

However aiojobs currently does talk at all about the usage of threads and/or processes.

It would be really helpful if aiojobs integrated some native form of threading using threadpoolexecutor or processpoolexecutor.

The aioprocessing library can be a starting point for using a process pool.

Imagine being able to create the aiojobs scheduler while specifying different executor. Default executor might be just part of the same asyncio loop. Optionally a threaded executor and optionally a process executor.

That way I might create 3 aiojobs schedulers if I have a complex aiohttp application that sometimes requires running jobs that need to live past a single request/response lifecycle, sometimes running jobs that needs to make use of IO-parallelism, and sometimes running jobs that needs to make use of CPU-parallelism.

Here's and example of using asyncio with threadpool and processpool executors: https://gist.github.com/jmbjorndalen/e1cbd93c475792c83f79ef475345ed00

graceful shutdown -- wait for all jobs to finish

I am using aiohttp, and I am using spawn to execute some atomic process in the background. I wonder how can I wait for all jobs to finish before I shutdown the aiohttp server.

Currently, I am using the following code:

async def shutdown(app):
    jobs = get_scheduler_from_app(app)._jobs
    while True:
        await asyncio.sleep(0)
        if len(jobs) == 0:
            return

app.on_shutdown.append(shutdown)

I am not sure wether it's a good/correct way to do that. I hope there will be a method like scheduler.wait_for_close() to do it.

PS: I didn't set the limit or the pending limit for the scheduler yet, I know there is a job queue and a pending queue, I'm not sure wether that will affect my code above.

Why python >= 3.5.3 ?

Hi,

Is there a reason for the python_requires='>=3.5.3' ?
Python 3.5.2 is the default for ubuntu LTS 16.04 for instance.

Was there a fix in python between 3.5.2 and 3.5.3 ?

Thanks !

Broken Scheduler.close() behavior on BaseException

If a BaseException which is not an Exception is thrown inside a job (for example, KeyboardInterrupt), the Scheduler._wait_failed() task fails to catch this exception after re-raising it, and this exception gets raised from the Scheduler.close() method (due to await self._failed_task), which is totally unexpected.

Probably the intended invariant in Scheduler._wait_failed() was to never raise an exception.

Upgrade to 0.4.0

Please upgrade lib to 0.4.0 because PyPI version 0.3.0 doesn't contain aiojobs.aiohttp.atomic decorator

Aiohttp: graceful shutdown doesn't wait for aiojob.asyncio.atomic handler to finish within canceled request.

Have some view decorated with aiojob.asyncio.atomic, name it The Handler.
The Handler writes some important data to database in multiple steps.
Some client makes a request to The Handler and cancels it (e.g. due to poor connection).
The Handler is started.
At the same time, we decide to update code on server, so make a graceful shutdown.
The server had gracefully shutdown, but The Handler was not finished, and we lose some data.

Expected behaviour

The graceful shutdown should wait for The Handler to finish, even if it is within the canceled request.

Actual behaviour

The graceful shutdown doesn't wait for The Handler to finish if it is within the canceled request.

Steps to reproduce

Here is the script which reproduces that behavior (run with arg test, like python3 main.py test). It runs multiple scenarios:

  • await_return - await for slow_taks, return the response
  • spawn_wait_return - spawn new job, wait for it, return the response
  • spawn_return - spawn new job without waiting for it, return the response
  • spawn_response_asap_wait - spawn new job, write to response, wait for the job to finish
  • atomic_simple - decorated with atomic, await for slow_task, return the response
  • atomic_response_asap - decorated with atomic, write to response, await for slow_task

For every scenario:

  • create a server on 127.0.0.1:8080, make a request using curl, make graceful shutdown, print outputs.
  • create a server on 127.0.0.1:8080, make a request using curl, cancel request (kill curl), make graceful shutdown, print outputs.

After all, it will print the following table:

scenario                  |                normal                 |          client_early_close          
                          | s. time | c. time | started |   ended | s. time | c. time | started |   ended
             await_return |   6.16s |   5.16s |    True |    True |   1.61s |   0.61s |    True |   False
        spawn_wait_return |   6.17s |   5.16s |    True |    True |   1.61s |   0.60s |    True |   False
             spawn_return |   1.62s |   0.61s |    True |   False |   1.61s |   0.60s |    True |   False
 spawn_response_asap_wait |   1.61s |   0.61s |    True |   False |   1.61s |   0.61s |    True |   False
            atomic_simple |   6.18s |   5.17s |    True |    True |   1.62s |   0.61s |    True |   False
     atomic_response_asap |   1.61s |   0.61s |    True |   False |   1.61s |   0.61s |    True |   False

The most important thing is started which indicates was slow_task started or not, and ended which indicated did slow_task finish or not.
As you can see, in the cases when the client had closed connection, even with atomic handler job was started, but terminated before finished.

import logging
import sys
from datetime import datetime
import subprocess, time, os, signal

from aiohttp import web
from aiohttp.web_response import StreamResponse
from aiojobs.aiohttp import setup, spawn, atomic

import asyncio


logger = logging.getLogger(__name__)

SLOW_TASK_DURATION = 5


async def slow_task():
    logger.info('slow_task started')
    await asyncio.sleep(SLOW_TASK_DURATION)
    logger.info('slow_task ended')


async def await_return(request):
    await slow_task()
    return web.Response(text='It works!!! await_return')


async def spawn_wait_return(request):
    job = await spawn(request, slow_task())
    await job.wait()
    return web.Response(text='It works!!! spawn_wait_return')


async def spawn_return(request):
    await spawn(request, slow_task())
    return web.Response(text='It works!!! spawn_return')


async def spawn_response_asap_wait(request):
    response = StreamResponse()
    job = await spawn(request, slow_task())
    await response.prepare(request)
    await response.write_eof('It works!!! spawn_response_asap_wait'.encode('utf-8'))
    await job.wait()
    return response


@atomic
async def atomic_simple(request):
    await slow_task()
    return web.Response(text='It works!!! atomic_simple')

@atomic
async def atomic_response_asap(request):
    response = StreamResponse()
    await response.prepare(request)
    await response.write_eof('It works!!! atomic_response_asap'.encode('utf-8'))
    await slow_task()
    return response


@web.middleware
async def timeit_middleware(request: web.Request, handler):
    start = datetime.now()
    response = await handler(request)
    end = datetime.now()
    logger.info('Time spend = %s' % (end - start))
    return response


functions = [
    await_return,
    spawn_wait_return,
    spawn_return,
    spawn_response_asap_wait,
    atomic_simple,
    atomic_response_asap,
]


logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s %(module)s %(funcName)s [%(levelname)-5.5s]  %(message)s",
    handlers=[
        logging.StreamHandler()
    ])


def add_padding_to_lines(lines, padding='\t\t\t'):
    return padding + padding.join(lines)


def shortify_timedelta(td):
    mm, ss = divmod(td.seconds, 60)
    hh, mm = divmod(mm, 60)
    s = ''
    if hh > 0:
        s += "%d:" % hh
    if mm > 0 or hh > 0:
        s += "%02d:" % mm
    if mm > 0 or hh > 0:
        s += "%02d" % ss
    else:
        s += "%d" % ss
    if td.microseconds:
        s = s + ".%d" % (td.microseconds/10000.0)  # round 2 digits
    return s + 's'


async def on_cleanup_make_message(app):
    # leave some message, to verify that on_cleanup was called
    print('MY_ON_CLEANUP_MESSAGE')
    logger.info('MY_ON_CLEANUP_MESSAGE')


def create_app():
    app = web.Application(middlewares=[
        timeit_middleware,
    ])
    setup(app, close_timeout=SLOW_TASK_DURATION * 2)
    app.on_cleanup.append(on_cleanup_make_message)
    for func in functions:
        app.router.add_get('/%s' % func.__name__, func)
    return app


if __name__ == '__main__':
    if len(sys.argv) == 1:
        print('Running server')
        app = create_app()
        web.run_app(app, host='127.0.0.1', port=8080)
    else:
        print('Running tests')
        results = {}
        MODES = ('normal', 'client_early_close')
        for func in functions:
            print('%s' % func.__name__)
            results[func.__name__] = {}
            for mode in MODES:
                print('\t%s' % mode)
                server_start_time = datetime.now()
                server_end_time = None
                server = subprocess.Popen(args=['python3', __file__],
                                        stderr=subprocess.STDOUT,
                                        stdout=subprocess.PIPE,
                                        universal_newlines=True)
                time.sleep(1)  # wait for server to startup
                curl_start_time = datetime.now()
                curl_end_time = None
                curl = subprocess.Popen(args=['curl', '127.0.0.1:8080/%s' % func.__name__],
                                        stderr=subprocess.STDOUT,
                                        stdout=subprocess.PIPE,
                                        universal_newlines=True)
                time.sleep(0.5)  # wait for curl to start request

                if mode == MODES[1]:  # early close
                    os.kill(curl.pid, signal.SIGTERM)  # cancel request from client side without awating full response

                os.kill(server.pid, signal.SIGINT)  # send keyboard interrupt to make graceful shutdown
                while server.poll() is None or curl.poll() is None:  # wait for server and curl
                    if server.poll() is not None and server_end_time is None:
                        server_end_time = datetime.now()
                    if curl.poll() is not None and curl_end_time is None:
                        curl_end_time = datetime.now()
                    time.sleep(0.1)

                if server_end_time is None:
                    server_end_time = datetime.now()

                if curl_end_time is None:
                    curl_end_time = datetime.now()
                # print some logs
                server_output = add_padding_to_lines(server.stdout.readlines())
                curl_output = add_padding_to_lines(curl.stdout.readlines())
                print('\t\tServer output:%s' % server_output)
                print('\t\tCurl stdout:%s' % curl_output)
                slow_task_ended = 'slow_task ended' in server_output
                slow_task_started = 'slow_task started' in server_output
                results[func.__name__][mode] = {
                    'server_time': server_end_time - server_start_time,
                    'curl_time': curl_end_time - curl_start_time,
                    'task_started': slow_task_started,
                    'task_ended': slow_task_ended,
                }
            print('\n' * 3)
        print('{:25} | {:^37} | {:^37}'.format('scenario', *MODES))
        print('%25s | %7s | %7s | %7s | %7s | %7s | %7s | %7s | %7s' % ('',
                                                                          's. time', 'c. time', 'started', 'ended',
                                                                          's. time', 'c. time', 'started', 'ended'))
        for func in functions:
            print('%25s' % func.__name__, end='')
            for mode in MODES:
                values = results[func.__name__][mode]
                print(' | %7s | %7s | %7s | %7s' %
                      (shortify_timedelta(values['server_time']), shortify_timedelta(values['curl_time']),
                       values['task_started'], values['task_ended'])
                      , end='')
            print()
        print('*s.time - very rough server time')
        print(' c.time - very rough curl time')

Your environment

OS: macOS Mojave 10.14.2
Python 3.6.5
aiohttp==3.4.4
aiojobs==0.2.2

Aiojobs stops processing jobs when CancelledError occurs.

aiohttp==3.5.4
aiojobs==0.2.2

Server example:

import asyncio

from aiohttp import web
from aiojobs.aiohttp import setup, spawn


async def bar():
    await asyncio.sleep(1)


async def foo(request):
    await spawn(request, bar())
    return web.Response(text="Foo")


@web.middleware
async def error_middleware(request, handler):
    try:
        return await handler(request)
    except asyncio.CancelledError:
        print('CancelledError occurred.')
        raise

app = web.Application()
app.router.add_get('/foo', foo, name='index')
app.middlewares.append(error_middleware)
setup(app, pending_limit=100, limit=10)

web.run_app(app)

Then Im running ab -c 100 -n 200 "http://127.0.0.1:8080/foo" to load application.
So when the pending_limit "pool" will be full we will receive CancelledError and then we wont be able to spawn new jobs. Previously spawned jobs will be not processed too so http://127.0.0.1:7000/foo won't work till application restart. Also aiojobs scheduler is not closed at this moment.

create_scheduler return scheduler that works in other loop compare main aiohttp app

Looks like aiojobs.create_scheduler returns scheduler with other loop:

async def create_scheduler(*, close_timeout=0.1, limit=100,
                                             exception_handler=None):
    if exception_handler is not None and not callable(exception_handler):
        raise TypeError('A callable object or None is expected, '
                        'got {!r}'.format(exception_handler))
    loop = asyncio.get_event_loop()  # <- Here created new loop
    return Scheduler(loop=loop, close_timeout=close_timeout,
                     limit=limit,
                     exception_handler=exception_handler)
def setup(app, **kwargs):
    async def on_startup(app):
        app['AIOJOBS_SCHEDULER'] = await create_scheduler(**kwargs)

Probably create_scheduler should receive a loop?

async def create_scheduler(*, close_timeout=0.1, limit=100,
                                             exception_handler=None, loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()
def setup(app, **kwargs):
    async def on_startup(app):
        app['AIOJOBS_SCHEDULER'] = await create_scheduler(**kwargs, loop=app.loop)```

The @atomic decorator now work for class grouped handler

I watch aiohttp's doc, it teach me use the aiojobs.atomic to avoid request cancel.

And I found it not work on class grouped handler, I don't mean class view, just this mean method handler:

class UserRelated(object):
     
    def create_user_handler(self, request):
        ...

I investigate the source code, ensure the @atomic decorator cause the problem.

Traceback on class grouped handler

web_protocol.py            310 ERROR    Error handling request
Traceback (most recent call last):
  File ".../lib/python3.6/site-packages/aiohttp/web_protocol.py", line 381, in start
    resp = await self._request_handler(request)
  File ".../lib/python3.6/site-packages/aiohttp/web_app.py", line 322, in _handle
    resp = await handler(request)
TypeError: wrapper() takes 1 positional argument but 2 were given

If you also think this is a bug, I can fix it

Scheduler stops scheduling tasks

aiojobs==0.2.2

If a lot of new jobs arrive within a short time span, scheduler.active_count may become greater than scheduler.limit. Due to this scheduler stops scheduling pending jobs in its _done callback. After that, new jobs block forever while waiting for a free slot in the pending queue, and jobs not in the pending queue don't get scheduled, so everything just hangs.

I was able to reproduce this with aiojobs==0.2.2 with the following script:

import asyncio

import aiojobs

_LIMIT = 10
_PENDING_LIMIT = 100
_JOB_DURATION = 5  # seconds
_NUM_JOBS = 200


async def slow_job(job_num):
    print('Slow job', job_num, 'starting')
    await asyncio.sleep(_JOB_DURATION)
    print('Slow job', job_num, 'done')


def spawn_jobs(scheduler, loop):
    spawn_tasks = []
    for job_num in range(_NUM_JOBS):
        spawn_task = asyncio.ensure_future(
            scheduler.spawn(slow_job(job_num)),
            loop=loop)
        spawn_tasks.append(spawn_task)
    return spawn_tasks


def print_conclusion(spawn_tasks, scheduler):
    num_spawned = len([t for t in spawn_tasks if t.done()])
    print(f'{num_spawned} jobs out of {len(spawn_tasks)} were spawned')
    print(
        f'Scheduler active job count is {scheduler.active_count}, with '
        f'pending queue size {scheduler.pending_count} and active job limit '
        f'{scheduler.limit}. This means that scheduler will never schedule a '
        f'task any more'
    )


async def amain(loop):
    print(f'Using aiojobs=={aiojobs.__version__}')
    scheduler = await aiojobs.create_scheduler(
        limit=_LIMIT, pending_limit=_PENDING_LIMIT)
    spawn_tasks = spawn_jobs(scheduler, loop)
    await asyncio.sleep(_JOB_DURATION * 2)
    print_conclusion(spawn_tasks, scheduler)
    jobs = await asyncio.gather(*spawn_tasks)  # blocks forever
    await asyncio.gather(*jobs)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(amain(loop))


if __name__ == '__main__':
    main()

The problem seems to be that active_count does not really return the number of jobs that are "active", more like the number of all jobs not in the pending queue, and this number has no upper bound.

@atomic is buggy when is used with views

A user reported on gitter chat that the decorator should be fixed like this:

def atomic(coro):
    @wraps(coro)
    async def wrapper(request):
        if isinstance(request, View):
            # Class Based View decorated.
            #request = request.request
            job = await spawn(request.request, coro(request))
        else:
            job = await spawn(request, coro(request))
        return await job.wait()
    return wrapper

missing async-timeout requirement

Looks like aiojobs have an unsatisfied requirement:

homo@darwin:~/git/owlery/hardservice$ mkdir dvenv
homo@darwin:~/git/owlery/hardservice$ virtualenv --python=python3 dvenv
created virtual environment CPython3.8.5.final.0-64 in 295ms
  creator CPython3Posix(dest=/home/homo/git/owlery/hardservice/dvenv, clear=False, global=False)
  seeder FromAppData(download=False, wheel=latest, six=latest, progress=latest, appdirs=latest, lockfile=latest, pip=latest, distro=latest, html5lib=latest, setuptools=latest, CacheControl=latest, ipaddr=latest, certifi=latest, msgpack=latest, retrying=latest, distlib=latest, colorama=latest, pkg_resources=latest, packaging=latest, webencodings=latest, requests=latest, urllib3=latest, contextlib2=latest, pytoml=latest, chardet=latest, idna=latest, pyparsing=latest, pep517=latest, via=copy, app_data_dir=/home/homo/.local/share/virtualenv/seed-app-data/v1.0.1.debian)
  activators BashActivator,CShellActivator,FishActivator,PowerShellActivator,PythonActivator,XonshActivator
homo@darwin:~/git/owlery/hardservice$ source dvenv/bin/activate
(dvenv) homo@darwin:~/git/owlery/hardservice$ pip install aiojobs
Collecting aiojobs
  Using cached aiojobs-0.3.0-py3-none-any.whl (10.0 kB)
Installing collected packages: aiojobs
Successfully installed aiojobs-0.3.0
(dvenv) homo@darwin:~/git/owlery/hardservice$ python socket_service/socket_server.py 
Traceback (most recent call last):
  File "socket_service/socket_server.py", line 2, in <module>
    import aiojobs
  File "/home/homo/git/owlery/hardservice/dvenv/lib/python3.8/site-packages/aiojobs/__init__.py", line 13, in <module>
    from ._scheduler import Scheduler
  File "/home/homo/git/owlery/hardservice/dvenv/lib/python3.8/site-packages/aiojobs/_scheduler.py", line 3, in <module>
    from ._job import Job
  File "/home/homo/git/owlery/hardservice/dvenv/lib/python3.8/site-packages/aiojobs/_job.py", line 5, in <module>
    import async_timeout
ModuleNotFoundError: No module named 'async_timeout'

Documentation should explain what "closing" a job means

The documentation for Job.close() is very vague: https://aiojobs.readthedocs.io/en/stable/api.html#aiojobs.Job.close

coroutine close(*, timeout=None)
Close the job.

If timeout exceeded asyncio.TimeoutError raised.

The job is in closed state after finishing the method.

Yes, but what does "closing" a job really entail? I suspect cancellation via asyncio.Task.cancel(), but the only way of knowing is to read the source code.

(goes read the source)

Yes, indeed it does asyncio.Task.cancel(). That is good. But docs need to be more specific. Perhaps linking to the asyncio docs.

Suggestion of improved docstring:

Closes the job by means of requesting the underlying asyncio Task to be [cancelled](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel).

Way to cancel jobs when exception is raised on one of them

Hello,

I'm trying to implement Trio-Nursery-like API on top of aiojobs Scheduler but I can't find a clean way to cancel all tasks when an exception is raised in one of them. The catch is that exception_handler is sync function so I can't call await scheduler.close() here.

I have solved it by using asyncio.Event and an extra task just to kill the other ones but this is not very elegant (example: https://gist.github.com/mprymek/133244919964aa18f57fbda3b55df738)

I'm not sure what the right solution would be, I see at least three possibilities:

  • option to close the Scheduler when first exception occurs (implement in Scheduler._wait_failed?)
  • make exception_handler async (breaking change :( )
  • optional extra async exception_handler to the existing one (probably confusing)

There are probably few more...

I have not studied aiojobs source well enough to judge this and supply a PR, I'm sorry.

P.S. thanks for your work on aiojobs!

"Non-thread-safe operation invoked on an event loop other than the current one" and "RuntimeError: Event loop is closed"

Versions:

aiojobs 1.0.0
Python 3.8
Lubuntu 20.04
pytest-asyncio 0.16.0

Set PYTHONASYNCIODEBUG=1

Tracabeck

Exception ignored in: <coroutine object Scheduler._wait_failed at 0x7f56ba3fd440>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/aiojobs/_scheduler.py", line 129, in _wait_failed
    task = await self._failed_tasks.get()
  File "/usr/local/lib/python3.8/asyncio/queues.py", line 163, in get
    await getter
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Scheduler._wait_failed at 0x7f56c8680a40>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/aiojobs/_scheduler.py", line 129, in _wait_failed
    task = await self._failed_tasks.get()
  File "/usr/local/lib/python3.8/asyncio/queues.py", line 163, in get
    await getter
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Scheduler._wait_failed at 0x7f56b9819cc0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/aiojobs/_scheduler.py", line 129, in _wait_failed
    task = await self._failed_tasks.get()
  File "/usr/local/lib/python3.8/asyncio/queues.py", line 163, in get
    await getter
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
Exception ignored in: <coroutine object Scheduler._wait_failed at 0x7f56ba3f7ec0>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/aiojobs/_scheduler.py", line 129, in _wait_failed
    task = await self._failed_tasks.get()
  File "/usr/local/lib/python3.8/asyncio/queues.py", line 163, in get
    await getter
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
=============================== warnings summary ===============================
tests/foobar/spameggspy::test_my_test_name
  /usr/local/lib/python3.8/site-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object Scheduler._wait_failed at 0x7f56d1791a40>
  
  Traceback (most recent call last):
    File "/usr/local/lib/python3.8/asyncio/queues.py", line 163, in get
      await getter
  GeneratorExit
  
  During handling of the above exception, another exception occurred:
  
  Traceback (most recent call last):
    File "/usr/local/lib/python3.8/site-packages/aiojobs/_scheduler.py", line 129, in _wait_failed
      task = await self._failed_tasks.get()
    File "/usr/local/lib/python3.8/asyncio/queues.py", line 165, in get
      getter.cancel()  # Just in case getter is not done yet.
    File "/usr/local/lib/python3.8/asyncio/base_events.py", line 721, in call_soon
      self._check_thread()
    File "/usr/local/lib/python3.8/asyncio/base_events.py", line 758, in _check_thread
      raise RuntimeError(
  RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

Reproducible sample

Sorry, I can not give a reproducible sample right now because the project is huge and I do not understand what exactly raises the warning. Any ideas how can I debug this problem by myself? Any suggestions?

CancelledError are not handled correctly even with aiojobs

We are running a internet mobile app which has around 100,000 users accessing each day . Probably,the web server received 3,000 hits per second.
Since all the user access should be realtime mode . We use aiohttp framework to handle all of them.
Every sec, the error log records around 5,6 cancellederrors which confused us very much . We can not figure out the real cause why so many cancellederrors are threw .
After several rounds of discussion in GitHub , we got to know the version of aiohttp 2.3.0 has resolved this common problem by using aiojobs to reduce the number of cancellederror occured.
But after 2.3.0 implemented , nothing has been changed. the number of cancellederror still remains same.

The error will cause the saving to the redis failed , while errors accumulated in certain amount , all the operations to redis will be responsed "error" .

It suffers. and we hope to have a good solution to cater it to get rid of all the cancellederror.

[The more information are in the thread which were just moved from aio-libs/http] (aio-libs/aiohttp#2466)
We are actually losing data in redis
And we also get many errors loged,
Plus our service was shutdown.

Closing the scheduler throws an `asyncio.CancelledError`

I've noticed that sometimes when I have done await scheduler.close(), there's an asyncio.CancelledError thrown.

This isn't documented at all.

But it sometimes doesn't happen either.

What would cause await scheduler.close() to throw that error?

So far I've resolved to do:

try:
    await scheduler.close()
except asyncio.CancelledError:
    pass

This is happening even for schedulers that have no jobs whatsoever.

What's weird is that I try to reproduce with a simple script but it doesn't work:

import asyncio
import aiojobs

async def f ():
    scheduler = await aiojobs.create_scheduler()
    try:
        await scheduler.close()
        print('closed')
    except Exception as e:
        print(e.__class__.__name__)

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(f())
    print('done')

main()

Allow directly instantiating the Scheduler class

Documentation says:

User should never instantiate the class but call create_scheduler() async function.

However I don't understand why this restriction. In the age of type checking, it makes usage of this scheduler weird, because you cannot have await in another class' __init__, which means that you have to defer creating a Scheduler object as a member of an object until later.

For example:

class MyWorker:

    def __init__(self):
        # this doesn't work:
        # self.scheduler: aiojobs.Scheduler = await aiojobs.create_scheduler()
        # so instead I have to do this
        self.scheduler: Optional[aiojobs.Scheduler] = None

    async def start(self):
        self.scheduler = await aiojobs.create_scheduler()

    async def close(self):
        await self.scheduler.close()

    async def do_something(self):
         ....
         assert self.scheduler is not None  # otherwise mypy will complain that scheduler can be None
         await self.scheduler.spawn(something)

Instead I would like to be able to instantiate Scheduler directly, perhaps add a start() async method to it to allow for any future async initialisation that might be needed:

class MyWorker:

    def __init__(self):
        self.scheduler = aiojobs.Scheduler()

    async def start(self):
        await self.scheduler.start()

    async def close(self):
        await self.scheduler.close()

    async def do_something(self):
         # no need to check for None!
         await self.scheduler.spawn(something)

To make this work, all we would need would be a simple PR that:

  1. add a async def start(): pass method to Scheduler;
  2. change the documentation accordingly.

Does that sound acceptable?

Timeout for pending jobs raises InvalidStateError

Environment:

  • macOS HighSierra 10.13.6
  • Python v3.6.6 with pyenv
  • aiojobs v0.2.2

How to reproduce:

import asyncio

import aiojobs


async def some_unexpectedly_long_async_job():
    await asyncio.sleep(10000)


async def reproduce_bug():
    scheduler = await aiojobs.create_scheduler(limit=1)

    jobs = await asyncio.gather(*[scheduler.spawn(some_unexpectedly_long_async_job())
                                  for _ in range(2)])
    try:
        waited_jobs = await asyncio.gather(*[job.wait(timeout=3) for job in jobs])
    except:
        # Deal with async jobs timeout
        pass

    await scheduler.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(reproduce_bug())

bug:

Exception in callback Job._done_callback(<Task cancell...at test.py:6>>)
handle: <Handle Job._done_callback(<Task cancell...at test.py:6>>)>
Traceback (most recent call last):
  File "/Users/zeniuus/.pyenv/versions/3.6.6/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/Users/zeniuus/.pyenv/versions/aiojobs-bug-test/lib/python3.6/site-packages/aiojobs/_job.py", line 138, in _done_callback
    scheduler._done(self)
  File "/Users/zeniuus/.pyenv/versions/aiojobs-bug-test/lib/python3.6/site-packages/aiojobs/_scheduler.py", line 141, in _done
    new_job._start()
  File "/Users/zeniuus/.pyenv/versions/aiojobs-bug-test/lib/python3.6/site-packages/aiojobs/_job.py", line 134, in _start
    self._started.set_result(None)
asyncio.base_futures.InvalidStateError: invalid state

How to call my 'non asyncio' class from aiojobs

Dear All.

I just jump'n to async world.
I need to fetch metrics from some host.
Currently I work it in sync way.
Every single loop (fetching 32 host) will take +/- 20 secs.
That is, for each host I only got new metric every 20 secs.

I need to got metric (for each host) every 1 sec.

Here is my noodle script:

#!/usr/bin/env python

import os
import sys
import datetime, time
import traceback
import json
import logging
import configparser


import json
import uuid
import websocket


import asyncio
import aiojobs


class xo:
    def __init__(self, server, email=None, password=None):
        self._server = server + '/api/'
        self._email = email
        self._password = password

        self._ws = websocket.WebSocket()
        self._ws.connect(self._server)
        payload = {
            "method": "session.signInWithPassword",
            "params": {"email": self._email, "password": self._password},
            "jsonrpc": "2.0",
            "id": uuid.uuid1().hex
        }
        resp=self.call(payload)


    def call(self,payload,timeout=10):
        self._ws.send(json.dumps(payload))
        start_time = time.time()
        resp = None
        while time.time() < (start_time + timeout):
            resp = json.loads(self._ws.recv())
            if ('id' not in resp) or resp['id'] != payload['id']:
                resp = None
                continue
            else:
                break
        return resp

    def getAllHost(self):
        payload = {
            "method": "xo.getAllObjects",
            "params": {'filter':{'type':'host'}},
            "jsonrpc": "2.0",
            "id": uuid.uuid1().hex
        }

        resp=self.call(payload)
        return resp['result']

    def getHostStat(self,id):
        payload = {
            "method": "host.stats",
            "params": {'host':id},
            "jsonrpc": "2.0",
            "id": uuid.uuid1().hex
        }
        resp=self.call(payload)
        return resp['result']

    def getAllStat(self):
        hosts=self.getAllHost()
        for k,v in hosts.items():
            hosts[k]['stat']={'error':None, 'data':None}
            try :
                hosts[k]['stat']['data']=self.getHostStat(k)['stats']
            except Exception as e :
                hosts[k]['stat']['error']=str(e)

        return hosts


async def coro(seq,id,rawstat):
    print('Start Of {} / {} at {}'.format(rawstat['name_label'], seq,time.time()))
    try :
        stats = myxo.getHostStat(id)
    except Exception as e:
        pass
    print('\t End Of {} / {} at {}'.format(rawstat['name_label'], seq,time.time()))


async def main():
    scheduler = await aiojobs.create_scheduler()
    rawhosts=myxo.getAllHost()
    seq=0
    while True :
        for k,v in rawhosts.items():
            await scheduler.spawn(coro(seq,k,v))
        await asyncio.sleep(1.0)
        seq+=1

if __name__ == '__main__':
    iniconfig = configparser.ConfigParser()
    try :
        iniconfig.read(sys.argv[1])
        print('Using config from: {}'.format(sys.argv[1]))
    except Exception as e:
        try :
            iniconfig.read('./xostat.ini')
            print('Using config from: ./xostat.ini')
        except Exception as e:
            sys.exit('No configuration file defined/found')

    #Init Connection to XO-Server
    try :
        wsuri='ws://{}:{}'.format(iniconfig['xo']['address'],
            iniconfig['xo'].get('port',80))
        wsuser=iniconfig['xo']['user']
        wspwd=iniconfig['xo']['password']
    except Exception as e :
        sys.exit('Wrong formated config file')

    myxo=xo(wsuri,wsuser,wspwd)

    asyncio.get_event_loop().run_until_complete(main())

and the sniped result :

Using config from: ./xostat.ini
Start Of Kalingga / 0 at 1608360338.888853
     End Of Kalingga / 0 at 1608360339.4845614
Start Of Jayabaya / 0 at 1608360339.4847863
     End Of Jayabaya / 0 at 1608360339.821991

... (and bunch of it)

Start Of Kalingga / 1 at 1608360366.2312956
     End Of Kalingga / 1 at 1608360367.1854613
Start Of Jayabaya / 1 at 1608360367.1857102
     End Of Jayabaya / 1 at 1608360367.476213

Note that there is still 28 secs delay between 1st start of kalingga and 2nd star of kalingga.
Event that a single fetch of kalingga only take +/- 1 sec.

Kindly please tell me how to make my script run fully async.

Sincerely
-bino-

Schedule jobs from non-async code

Hi @asvetlov – thanks for another awesome aio library!

I was really excited to discover aiojobs as I believed it would allow me to clean up a lot of custom scheduling code I've written in my asyncio applications. Unfortunately, though, aiojobs doesn't actually support my single most-common use case: fire-and-forget background tasks.

I have lot of instances where I need to kick off an asynchronous background task from synchronous code. That usually ends up looking something like this:

def some_sync_fn(*args, **kwargs):
    ...
    task = asyncio.ensure_future(some_async_fn('foo', 'bar', 123))
    ...

Obviously though this doesn't have the nice features provided by aiojobs. I got all excited to switch to aiojobs, but Scheduler::spawn is a coroutine – I can't call it from my sync code.

Well, that's not entirely accurate. Scheduler::spawn is marked async, but it never actually calls any async code:

async def spawn(self, coro):
# The method is not a coroutine
# but let's keep it async for sake of future changes
# Migration from function to coroutine is a pain
if self._closed:
raise RuntimeError("Scheduling a new job after closing")
job = Job(coro, self, self._loop)
should_start = (self._limit is None or
self.active_count < self._limit)
self._jobs.add(job)
if should_start:
job._start()
else:
self._pending.append(job)
return job

The comment calls this out as an accommodation for possible future changes, but I would argue that this method should never be a coroutine. Making it so keeps consumers from scheduling background tasks from their sync code, which is (IMO at least) one of the major features a scheduler should offer. Is there something you have in mind for the future that requires spawn to be a coroutine?

Although I think it makes more sense for spawn to simply stop being a coroutine, I'd even settle for a different Scheduler method that sets up the task and returns immediately.

What do you think?

Add support for positional and keyword arguments to @atomic

I have a task to pass extra arguments in my view.
For example:

from functools import partial

from aiohttp import web
from aiojobs.aiohttp import atomic, setup


NEW, PREPARED, RESOLVED, NOT_RESOLVED = range(4)


def setup_routes(app):
    app.add_routes([
        web.post(
            r'/api/v1/tasks/{id:\d+}/to_new',
            partial(change_status, target=NEW)
        ),
        web.post(
            r'/api/v1/tasks/{id:\d+}/to_prepared',
            partial(change_status, target=PREPARED)
        ),
        web.post(
            r'/api/v1/tasks/{id:\d+}/to_resolved',
            partial(change_status, target=RESOLVED)
        ),
        web.post(
            r'/api/v1/tasks/{id:\d+}/to_not_resolved',
            partial(change_status, target=NOT_RESOLVED)
        )
    ])


@atomic
async def change_status(request, target=None):
    # Some logic depending on the target
    # ...
    return web.Response(body='Task changed status to {}'.format(target))


app = web.Application()
setup_routes(app)
setup(app)
web.run_app(app)

When i am using @atomic i get an error like:
TypeError: wrapper() got an unexpected keyword argument 'target'

Changes in @atomic can help to solve this problem:

def atomic(coro):
    @wraps(coro)
    async def wrapper(request, *args, **kwargs):
        if isinstance(request, View):
            # Class Based View decorated.
            request = request.request

        job = await spawn(request, coro(request, *args, **kwargs))
        return await job.wait()
    return wrapper

Is it acceptable?

Long running task.

Hello. Is it a good idea to use it library for long running task?
For example, in web applicaton.
User makes request, server create task by scheduler.spawn and then user can request server about task's status.

Dangling jobs once active and pending limits hit

Goal:

Drain gracefully jobs from scheduler if aiohttp is shutting down.

Repro:

  1. Execute:
import asyncio
import logging

import aiojobs
from aiohttp import web


async def coro(app):
    for i in range(0, 1000):
        job = await app["scheduler"].spawn(dummy())
        print(job)


async def dummy():
    await asyncio.sleep(5)


async def start_scheduler(app):
    scheduler = aiojobs.create_scheduler(
        limit=5, pending_limit=5
    )

    app["scheduler"] = await scheduler


async def stop_scheduler(app):
    for i in range(1, 100):
        print(f"Tasks active: {app['scheduler'].active_count}")
        print(f"Tasks pending: {app['scheduler'].pending_count}")
        if len(app["scheduler"]) > 0:
            await asyncio.sleep(1)
            for job in app['scheduler']:
                print(job, job.active, job.pending)
        else:
            break

    await app["scheduler"].close()


async def start_jobs(app):
    asyncio.create_task(coro(app))


async def init_app():
    app = web.Application()
    app.on_startup.append(start_scheduler)
    app.on_startup.append(start_jobs)
    app.on_shutdown.append(stop_scheduler)

    return app



def main():
    logging.basicConfig(level=logging.DEBUG,
                        format="[%(asctime)s] %(levelname)s %(message)s",
                        )

    app = init_app()
    web.run_app(app)


if __name__ == "__main__":
    main()
  1. Once 10 or more jobs are started hit ctrl-c
  2. Wait until app terminates

Expected result

No dangling tasks.

Actual result

One task is dangling.

pip install aiojobs error: <class 'FileNotFoundError'>: [Errno 2] No such file or directory

[~/Documents]$ pip --verbose install aiojobs
Querying PyPI ...
Using aiojobs==0.2.2...
A binary distribution is available and will be used.
Downloading package ...
Opening: https://files.pythonhosted.org/packages/a7/8e/1d41d21e7cd09d744921490be99a9e2cfa498ae1af36b9c3ee0c3395d2c9/aiojobs-0.2.2-py3-none-any.whl

Save as: /private/var/mobile/Containers/Data/Application/106690BE-96ED-4657-8531-E99119A7CD8F/tmp//aiojobs-0.2.2-py3-none-any.whl (24664 bytes)
24664 [100.00%]
Installing wheel: aiojobs-0.2.2-py3-none-any.whl...
Extracting wheel..
Extraction finished, running handlers...
Running handler 'WHEEL information checker'...
Wheel generated by: flit 1.2.1
Running handler 'dependency handler'...
Running handler 'top_level.txt installer'...
Cleaning up...
<class 'FileNotFoundError'>: [Errno 2] No such file or directory: '/private/var/mobile/Containers/Data/Application/106690BE-96ED-4657-8531-E99119A7CD8F/tmp/wheel_tmp/aiojobs-0.2.2-py3-none-any.whl/aiojobs-0.2.2.dist-info/top_level.txt'

Why it print exception stack twice?

Code example

import asyncio

import aiojobs


async def f():
    try:
        await asyncio.sleep(2)
    except asyncio.CancelledError:
        print('canceled')
    else:
        print('not canceled')


async def err():
    raise Exception


async def main():
    scheduler = await aiojobs.create_scheduler(close_timeout=None)

    await scheduler.spawn(f())
    await scheduler.spawn(err())

    try:
        await scheduler
    except:
        print('exception occurred, cancel')
        await scheduler.close()
    else:
        print('all good')


if __name__ == '__main__':
    asyncio.run(main())

Result

exception occurred, cancel
Job processing failed
job: <Job closed coro=<<coroutine object err at 0x000001E5768B1D40>>>
Traceback (most recent call last):
  File "C:\Users\Asus\.virtualenvs\test-2uqMFfYk\lib\site-packages\aiojobs\_job.py", line 89, in _close
    await self._task
  File "C:/Users/Asus/Documents/Projects/python/test/sub.py", line 16, in err
    raise Exception
Exception
Job processing failed
job: <Job closed coro=<<coroutine object err at 0x000001E5768B1D40>>>
Traceback (most recent call last):
  File "C:\Users\Asus\.virtualenvs\test-2uqMFfYk\lib\site-packages\aiojobs\_job.py", line 89, in _close
    await self._task
  File "C:/Users/Asus/Documents/Projects/python/test/sub.py", line 16, in err
    raise Exception
Exception
cancelled

Why it print stack trace twice?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.