Giter Site home page Giter Site logo

omnilib / aiomultiprocess Goto Github PK

View Code? Open in Web Editor NEW
1.7K 1.7K 97.0 184 KB

Take a modern Python codebase to the next level of performance.

Home Page: https://aiomultiprocess.omnilib.dev

License: MIT License

Python 97.87% Makefile 2.13%
async asyncio hacktoberfest multiprocessing python python3

aiomultiprocess's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

aiomultiprocess's Issues

Implement async iterables for map/starmap

Description

Desired use case:

async for result in pool.map(coro, values):
    ...

This could be achieved by having Pool.map return a special object that would either wait for everything and return all results when awaited, or when used in an async for loop, wait for and return each item/future in order when they're ready.

PoolTest:test_pool_worker_{exceptions,max_tasks,stop} are flaky and get the test suite stuck

Description

We are running into a problem in NixOS that the test suite sometimes fails in these three tests, and that causes the whole test suite to hang indefinetely.

/nix/store/66fbv9mmx1j4hrn9y06kcp73c3yb196r-python3-3.8.9/lib/python3.8/multiprocessing/queues.py:110: Empty
=========================== short test summary info ============================
FAILED aiomultiprocess/tests/pool.py::PoolTest::test_pool_worker_exceptions
FAILED aiomultiprocess/tests/pool.py::PoolTest::test_pool_worker_max_tasks - ...
FAILED aiomultiprocess/tests/pool.py::PoolTest::test_pool_worker_stop - _queu...
=================== 3 failed, 28 passed, 2 skipped in 16.45s ===================
building of '/nix/store/lhmnh05x7gs28s4skngm9rfn0q3flic3-python3.8-aiomultiprocess-0.9.0' timed out after 7200 seconds of silence

If I disable these tests using pytest -k the tests complete just fine.

Details

  • OS: NixOS (Linux 5.10.40)
  • Python version: 3.8.9
  • aiomultiprocess version: 0.9.0
  • Can you repro on master? Yes, tested on 235321f
  • Can you repro in a clean virtualenv? We build in a sandbox, they're always clean

0.7.0 issue running the examples in the README

Does not occur in 0.6.1

Description

RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.

    This probably means that you are not using fork to start your
    child processes and you have forgotten to use the proper idiom
    in the main module:

        if __name__ == '__main__':
            freeze_support()
            ...

    The "freeze_support()" line can be omitted if the program
    is not going to be frozen to produce an executable.

Details

  • OS: macOS Catalina
  • Python version: 3.7
  • aiomultiprocess version: 0.7.0
  • Can you repro on master? haven't tried. Used PyPi version
  • Can you repro in a clean virtualenv? yes

No beneficial to run CPU-bound task at the same time with aiomultiprocess ?

Description

I have a task which is kind of both CPU-bound and IO-bound, like the toy code below.
When I ran it, I found that the CPU only run 100% on a single thread, not as many as the number of processes in the Pool.
Is it that there is no beneficial to run CPU-bound task at the same time with aiomultiprocess, or I wrote the code wrong?

import asyncio
from datetime import datetime

from aiohttp import request
from aiomultiprocess import Pool, Process

def fib(x):
    """Recursive function of Fibonacci number"""
    if x==0:
        return 0
    elif x==1:
        return 1
    else:
        return fib(x-1)+fib(x-2)

async def get(url):
    # async with request("GET", url) as response:
    #     await asyncio.sleep(1)
    #     return await response.text("utf-8")
    print('url ' + str(multiprocessing.current_process()) + '  ' + str(datetime.now()))
    await asyncio.sleep(5)
    fib(30)
    print('url ' + str(multiprocessing.current_process()) + '  ' + str(datetime.now()))

async def main():
    urls = ["https://jreese.sh", "https://www.baidu.com", "a", "b", "c", "d"]
    # p = Process(target=get, args=("https://jreese.sh", "https://www.baidu.com",))
    # await p
    print(datetime.now())
    async with Pool(4) as pool:
        result = await pool.map(get, urls)
        print(result)
        print(datetime.now())

# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())

Details

  • OS: MacOS 10.13.6 / CentOS 7
  • Python version: 3.7.2
  • aiomultiprocess version: 0.5.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

ClientConnectorError while using proxies in aiohttp requests with aiomultiprocess.Pool

Code

from asyncio import gather
import asyncio
from aiomultiprocess import Pool, Worker
from aiohttp import ClientSession
import aiohttp



async def get(url, host, port, user, passwd):
    async with aiohttp.ClientSession() as session:
        proxy_auth = aiohttp.BasicAuth('user', 'passwd')
        async with session.get("http://python.org",
                               proxy="http://host:port",
                               proxy_auth=proxy_auth,
                               ) as resp:
            print(await resp.text())

async def main(host, port, user, passwd):
    urls=['http://python.org']
    async with Pool() as pool:
       async for result in pool.map(get, urls):
          print('done')
        

def start(host, port, user, passwd):
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main(host, port, user, passwd))


if __name__ == "__main__":
    start(host, port, user, passwd)

Details

  • OS: Windows 10
  • Python version: 3.9.5
  • aiomultiprocess version: 0.9.0

Full traceback

Traceback (most recent call last):
  File "test.py", line 31, in <module>
    start()
  File "test.py", line 27, in start
    asyncio.run(main())
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "test.py", line 21, in main
    async for result in pool.map(get, urls):
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiomultiprocess\pool.py", line 145, in results_generator
    yield (await self.pool.results([task_id]))[0]
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiomultiprocess\pool.py", line 308, in results
    raise ProxyException(tb)
aiomultiprocess.types.ProxyException: Traceback (most recent call last):
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 969, in _wrap_create_connection
    return await self._loop.create_connection(*args, **kwargs)  # type: ignore  # noqa
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 1050, in create_connection
    transport, protocol = await self._create_connection_transport(
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 1080, in _create_connection_transport
    await waiter
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 395, in _loop_writing
    self._write_fut = self._loop._proactor.send(self._sock, data)
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 529, in send
    self._register_with_iocp(conn)
  File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 718, in _register_with_iocp
    _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
OSError: [WinError 87] The parameter is incorrect

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiomultiprocess\pool.py", line 110, in run
    result = future.result()
  File "C:\Users\Administrator\Desktop\sizeer\test.py", line 12, in get
    async with session.get("http://python.org",
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\client.py", line 1117, in __aenter__
    self._resp = await self._coro
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\client.py", line 520, in _request
    conn = await self._connector.connect(
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 535, in connect
    proto = await self._create_connection(req, traces, timeout)
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 890, in _create_connection
    _, proto = await self._create_proxy_connection(req, traces, timeout)
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 1139, in _create_proxy_connection
    transport, proto = await self._wrap_create_connection(
  File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 975, in _wrap_create_connection
    raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host python.org:443 ssl:default [The parameter is incorrect]

Description

I get this error no matter if i use Worker or Pool. Without proxies it works fine, adding verify_ssl=False to request doesn't resolve the issue as well. What's more without using Pool or Worker, only with asyncio it works just fine both with proxies and without. However, with python 3.8.5 and macos it works, not sure why (downgrading python on windows didn't resolve the problem). I suppose it's an issue with aiomultiprocess. Thanks in advance for help!

whats the magic of hard coding 0.005 at `await asyncio.sleep()`

Description

sorry for this boring question,
but any reasons it is have to be 0.005 ?

maybe a constant of module or package level as an instead.
especially the loop() of Pool about # let someone else do some work for once
https://github.com/jreese/aiomultiprocess/blob/ef791e96af4d8fd802fecb20353f5abdc002ee0a/aiomultiprocess/core.py#L292

P.S it seems the class Process missing a member function terminate

P.S.S maybe #5 refers to python fork() support stuff of asyncio seems python 3.6.1+ works, check my last reply of #5

Details

  • OS:
  • Python version:
  • aiomultiprocess version:
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

How to Passing tow iterable parameters?

Description

Hi, great library, thanks!

i want to check the url if has image,and my check code is:

async def get_image(url:'http://test.com/',dirs:'[a.jpg,b.jpg,c.jpg]'):
    async with aiohttp.ClientSession() as session:
        for dir in dirs:
            async with session.get(url=url+dir) as resp:
                if resp.status == 200:
                    return await resp.content()
                    # only need one exists image url
async def main(url,dirs):
    async with aiomultiprocess.Pool() as pool:
        result = await pool.map(get_image,??,??)
        # hao to set there?
    print(result)

if __name__ == '__main__':
    urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/',]
    dirs = ['a.jpg','b.jpg','c.jpg']
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(urls,dirs))

and i want to use pool to mission accomplished,but i don konw hao to desgin.

so,i desgin code again:

dirs = ['a.jpg', 'b.jpg', 'c.jpg']
async def get_image(url:'http://test.com/'):
    async with aiohttp.ClientSession() as session:
        for dir in dirs:
            print(url+dir)
            async with session.get(url=url+dir,timeout=3) as resp:
                if resp.status == 200:
                    return await resp.content()
                    # only need one exists image url

async def main():
    urls = urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/']
    async with aiomultiprocess.Pool() as pool:
        result = await pool.map(get_image,urls)
    print(result)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

now is success,but i wang to konw,hao to passing tow iterable parameters in function main()?

Details

  • OS:windows10
  • Python version:3.6
  • aiomultiprocess version:0.5.0

EOFError Python 3.7.5

Description

Run the demo code:

import asyncio
from aiohttp import request
from aiomultiprocess import Worker

async def get(url):
    async with request("GET", url) as response:
        return await response.text("utf-8")

async def main():
    p = Worker(target=get, args=("https://jreese.sh", ))
    response = await p

# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Occur some error:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 114, in _main
    prepare(preparation_data)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 225, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 277, in _fixup_main_from_path
    run_name="__mp_main__")
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/Users/Matrix/Desktop/eoftest.py", line 15, in <module>
    loop.run_until_complete(main())
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Users/Matrix/Desktop/eoftest.py", line 10, in main
    p = Worker(target=get, args=("https://jreese.sh", ))
  File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 204, in __init__
    super().__init__(*args, process_target=Worker.run_async, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 103, in __init__
    namespace=get_manager().Namespace(),
  File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 29, in get_manager
    _manager = context.Manager()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 563, in start
    self._process.start()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
    self._launch(process_obj)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
    _check_not_importing_main()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
    is not going to be frozen to produce an executable.''')
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
Traceback (most recent call last):
  File "eoftest.py", line 15, in <module>
    loop.run_until_complete(main())
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "eoftest.py", line 10, in main
    p = Worker(target=get, args=("https://jreese.sh", ))
  File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 204, in __init__
    super().__init__(*args, process_target=Worker.run_async, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 103, in __init__
    namespace=get_manager().Namespace(),
  File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 29, in get_manager
    _manager = context.Manager()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 56, in Manager
    m.start()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 567, in start
    self._address = reader.recv()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
    raise EOFError
EOFError

Details

  • OS: macOS 10.15.3
  • Python version: Python 3.7.5
  • aiomultiprocess version: 0.70
  • Can you repro on master? yes
  • Can you repro in a clean virtualenv? yes

Post pycon code example

Description

Post pycon code example

Details

  • OS:
  • Python version:
  • aiomultiprocess version:
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

Pass instance method to map/starmap

Description

Can we provide a instance method instead of a static function?

async with Pool() as pool:
    await pool.map(self.write_data, node)

I get back this error. Not sure what to make of this error.

Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object

Can someone please help me out?

Details

  • OS: Ubuntu-20
  • Python version: 3.8
  • aiomultiprocess version: 0.9.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

ValueError: cannot find context for 'fork'

Description

When importing aiomultiprocess, I got this error.

Details

from aiomultiprocess import Pool

Traceback (most recent call last):

File "", line 1, in
from aiomultiprocess import Pool

File "C:\Users\patlo\Anaconda3\lib\site-packages\aiomultiprocess_init_.py", line 11, in
from .core import Pool, Process, Worker

File "C:\Users\patlo\Anaconda3\lib\site-packages\aiomultiprocess\core.py", line 35, in
context = multiprocessing.get_context("fork")

File "C:\Users\patlo\Anaconda3\lib\multiprocessing\context.py", line 238, in get_context
return super().get_context(method)

File "C:\Users\patlo\Anaconda3\lib\multiprocessing\context.py", line 192, in get_context
raise ValueError('cannot find context for %r' % method)

ValueError: cannot find context for 'fork'

  • OS:
  • Python version:3.6.3
  • aiomultiprocess version:0.4.0
  • Can you repro on master? Yes
  • Can you repro in a clean virtualenv? NO

How to execute async generator in Pool

Description

Hi, great library, thanks!

I want to run async generator in Process Pool using aiomultiprocess lib, how can I do that, any pointer. Currently I'm getting error like:
File "/home/rohankar/anaconda3/lib/python3.6/site-packages/aiomultiprocess/core.py", line 93, in __init__ raise ValueError(f"target must be coroutine function") ValueError: target must be coroutine function

Script that I tried;

import asyncio


async def ait(nr):
    for i in range(nr):
        await asyncio.sleep(0.1)
        yield i


from aiomultiprocess import Worker


async def main():
    # This Works
    # async for i in ait(10):
    #     print(i)

    # This throw error
    p = Worker(target=ait, args=(20,))
    p.start()
    print(await p)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Details

  • OS: Ubuntu 18.04
  • Python version: 3.6
  • aiomultiprocess version: 0.5.0
  • Can you repro on master? didn't try
  • Can you repro in a clean virtualenv? didn't try

Problem when using aiomultiprocess with aiopipe

Description

Hi, I'm trying to combine aiomultiprocess with aiopipe, which in its demo shows that it works well with multiprocessing.Process (and indeed it works well with multiprocessing since I've tried it).

I've modified aiopipe's duplex pipe demo into the snippet as below. The only difference is that I changed multiprocessing.Process to aiomultiprocess.Process and made it awaiting on proc.join() instead of a blocking call. And the asyncio.run part is also removed since it's no longer needed in aiomultiprocess.

Here is my snippet:

# test.py

import asyncio
from aiomultiprocess import Process
from aiopipe import aioduplex

async def test(ipc):
    async with ipc.open() as (rx, tx):
        tx.write(b"hello")
        rep = await rx.readline()
        tx.write(rep.upper())

async def main():
    mainpipe, chpipe = aioduplex()
    with chpipe.detach() as chpipe:
        proc = Process(target=test, args=(chpipe,))
        proc.start()

    async with mainpipe.open() as (rx, tx):
        req = await rx.read(5)
        tx.write(req + b" world\n")
        msg = await rx.readline()
    
    await proc.join()
    print(msg)

if __name__ == "__main__":
    asyncio.run(main())

But it failed with the Bad file descriptor error.

aio process 18680 failed
Traceback (most recent call last):
  File "<project_dir>/venv/lib/python3.7/site-packages/aiomultiprocess/core.py", line 132, in run_async
    result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "<project_dir>/test.py", line 10, in test
    async with ipc.open() as (rx, tx):
  File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 267, in open
    async with self._rx.open() as rx, self._tx.open() as tx:
  File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 133, in open
    transport, stream = await self._open()
  File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 196, in _open
    os.fdopen(self._fd))
  File "/usr/lib/python3.7/os.py", line 1026, in fdopen
    return io.open(fd, *args, **kwargs)
OSError: [Errno 9] Bad file descriptor
Process SpawnProcess-2:
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<project_dir>/venv/lib/python3.7/site-packages/aiomultiprocess/core.py", line 132, in run_async
    result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))
  File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "<project_dir>/test.py", line 10, in test
    async with ipc.open() as (rx, tx):
  File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 267, in open
    async with self._rx.open() as rx, self._tx.open() as tx:
  File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
    return await self.gen.__anext__()
  File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 133, in open
    transport, stream = await self._open()
  File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 196, in _open
    os.fdopen(self._fd))
  File "/usr/lib/python3.7/os.py", line 1026, in fdopen
    return io.open(fd, *args, **kwargs)
OSError: [Errno 9] Bad file descriptor

The error was raised from the child process when it tried to open the pipes at the line ipc.open() in function test. In the backtrace it shows that self._fd is a bad file descriptor which is obtained from os.pipe(). (source)

def aiopipe() -> Tuple["AioPipeReader", "AioPipeWriter"]:
    rx, tx = os.pipe()
    return AioPipeReader(rx), AioPipeWriter(tx)

I've dug a little bit into both aiomultiprocess and aiopipe, both of which are not in large code base therefore it's not a pain to do code tracing.

I see aiomultiprocess use multiprocessing.Process under the hood (did I misunderstand anything?).
https://github.com/jreese/aiomultiprocess/blob/aa623804a668ee2d6cd0a8b56a630d1e9bdaef78/aiomultiprocess/core.py#L107-L113

Since aiopipe plays well with multiprocessing but not aiomultiprocess, I need some helps to figure out what I mistook and when and why was the file descriptor corrupted.

Thanks for the great library anyway!

Details

  • OS: Ubuntu 16.04
  • Python version: 3.7.3
  • aiomultiprocess version: 0.7.0
  • Can you repro on master? Not sure, have not tried yet.
  • Can you repro in a clean virtualenv? Yes, only aiomultiprocess and aiopipe are installed.
(venv) $ pip freeze
aiomultiprocess==0.7.0
aiopipe==0.2.2

[Question] Can one use semaphore to limit the concurrency?

Description

Hello,

I'm trying out your cool library, thanks! How would one go about limiting concurrency? I get an aexit using a BoundedSemaphore.

Details

  • OS: Ubuntu 18.04
  • Python version: 3.7.5
  • aiomultiprocess version: 0.7.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

Can't using Pool module, program no response.

Description

When I using the Pool module the program was stuck without error shows only in Win10(with your sample code). CMD will show some error after a long while.

[Error - 1:22:03 PM] Request textDocument/hover failed.
  Message: unknown document
  Code: -32000 
   at Microsoft.Python.LanguageServer.Implementation.ProjectFiles.GetEntry(Uri documentUri, Boolean throwIfMissing)
   at Microsoft.Python.LanguageServer.Implementation.ProjectFiles.GetEntry(TextDocumentIdentifier document, Nullable`1 expectedVersion, ProjectEntry& entry, PythonAst& tree)
   at Microsoft.Python.LanguageServer.Implementation.Server.Hover(TextDocumentPositionParams params, CancellationToken cancellationToken)
   at Microsoft.Python.LanguageServer.Implementation.LanguageServer.Hover(JToken token, CancellationToken cancellationToken)

But I can run this code in CentOS 6, it seems like only shows in the Window,

I've used debuger to check this error, the code was stuck in base_events.py and the method call run_forever, it looks like an endless loop there

Here is my code:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    session.close()
    return response

async def request():
    url = 'https://jreese.sh'
    urls = [url for _ in range(5)]
    async with Pool() as pool:
        result = await pool.map(get, urls)
        return result

coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)

Details

  • OS: Windows10 & CentOS 6
  • Python version: Python 3.6.6(Win10), Python3.4(CentOS 6)
  • aiomultiprocess version: v0.5.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

How to implement progress with Pool?

Description

I'm trying to implement a progress bar of some sort, like tqdm. Is there any way to do this with Pool? Should i use Process directly?
Thanks

Details

  • OS: linux
  • Python version: 3.7.0
  • aiomultiprocess version: 0.7.0

Strange behavior, misunderstanding. Creates two processes if coroutine from another module?

aiomultiprocess starts more the one process if the coroutine is not in the same file. Below a short example.

import asyncio
from aiomultiprocess import Process, Worker
import aiomultiprocess
import os

aiomultiprocess.set_start_method('fork')

async def do_sleep():
    while True:
        print(os.getpid())
        await asyncio.sleep(1)

async def create_process():
    p = Process(target=do_sleep)
    p.start()
    return p

async def main():
    p = await create_process()
    
if __name__ == "__main__":
    asyncio.run(main())

If i run this and check the processes i get an expected result. Main process and sub process.

$ ps auxf
159822  .... \_ python process_in_file.py
159828  ............   \_ python process_in_file.py

Now i just outsource the coroutine to other file. The code still the same

process_from_other_module_start.py

import os
import asyncio
from aiomultiprocess import Process, Worker
import aiomultiprocess
import process_from_other_module_import

aiomultiprocess.set_start_method('fork')

async def create_process():
    p = Process(target=process_from_other_module_import.do_sleep)
    return await p

async def main():
    p = await create_process()
    
if __name__ == "__main__":
    asyncio.run(main())

process_from_other_module_import.py

import asyncio
import os

async def do_sleep():
    while True:
        print(os.getpid())
        await asyncio.sleep(1
$ ps auxf
159980  ...   \_ python process_from_other_module_start.py
159981  ......       \_ python process_from_other_module_start.py
159986 ............. \_ python process_from_other_module_start.py

System shows me two sub-processes but i started just one...? Is this a Bug? Can somebody explain it?

  • OS: linux
  • Python version: python3.8 and 3.7
  • aiomultiprocess version: aiomultiprocess==0.7.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

How to fix: " An attempt has been made to start a new process before the...." on Windows?

Description

I am facing this error and am not sure how to fix it


        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

I tried doing multiprocessing.freeze_support() in main like so:

if __name__ == '__main__':
    import multiprocessing
    multiprocessing.freeze_support()
    asyncio.run(main=entry_point())

Details

I recently just stumbled across this tool while trying to optimize my code, this seems like a great project!
My current problem is I keep getting this error and am unsure why, testing on wslv1 (Windows Subsystem for Linux version 1) is not feasible as the library I am using seems to be broken on wslv1; however, that's a different issue.
Here is how I am trying to use the tool

        from screenshot_class import take_screenshot, screenshot_handler, _chunk_list, receive
        unique_resolved_domains = list(sorted({url.split(':')[0]for url in full if ':' in url}))
        # Grab resolved subdomains
        # coroutines = [take_screenshot(url) for url in unique_resolved_domains]
        #await screenshot_handler(coroutines)
        async with aiomultiprocess.Pool() as pool:
            print('Created pool')
            #serialized_tiles = [take_screenshot(url) for url in unique_resolved_domains]
            #print(f'Length of serialized_tiles: {len(serialized_tiles)} ')
            #for chunk in _chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
            #print(f'Chunk: {chunk} and length: {len(chunk)}')
            try:
                await pool.map(take_screenshot, unique_resolved_domains)
                #await pool.map(screenshot_handler, chunk)
            except Exception as ee:
                print(f'An excpeption has occurred while mapping: {ee}')
                #continue

Yes I did get inspiration from a recent project

screenshot_class:

import asyncio
from pyppeteer import launch


def _chunk_list(items, chunk_size):
    return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]


async def worker(queue):
    while True:
        # Get a "work item" out of the queue.
        stor = await queue.get()
        try:
            await stor
            queue.task_done()
            # Notify the queue that the "work item" has been processed.
        except Exception:
            queue.task_done()


async def screenshot_handler(lst):
    print('Created screenshot handler')
    queue = asyncio.Queue()

    for stor_method in lst:
        # enqueue the coroutines
        queue.put_nowait(stor_method)
    # Create ten worker tasks to process the queue concurrently.
    tasks = []
    for i in range(10):
        task = asyncio.create_task(worker(queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    await queue.join()

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)


async def receive(lst):
    for url in lst:
        await take_screenshot(url)


async def take_screenshot(url):
    url = f'http://{url}' if ('http' not in url and 'https' not in url) else url
    url.replace('www.', '')
    print(f'Taking a screenshot of: {url}')
    browser = await launch(headless=True, ignoreHTTPSErrors=True, args=["--no-sandbox"])
    page = await browser.newPage()
    try:
        await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36')
        # default wait time of 30 seconds
        await page.goto(url)
        await page.screenshot({ 'path': f'{url.replace("https://", "").replace("http://", "")}.png'})
    except Exception as e:
        print(f'Exception occurred: {e} for: {url} ')
    # No matter what happens make sure browser is closed
    await browser.close()

There are a lot of comments as I tried a lot of different things, I am not sure if attempting to use asnycio queue + aiomultiprocess would be a wise decision. I have not been able to find examples of using queues with aiomultiprocessing so will need to watch the talk and hope it's their or give it the old scientific method.

The big problem is my code seems to run fine then it runs the program again before spitting that issue. I tried using a Kali vm and using the fork method; however, got the same error.
Apologies for writing an essay 😅

  • OS: Windows 10 v1909
  • Python version: Python 3.7.0
  • aiomultiprocess version: 0.7.0
  • Can you repro on master? Yes (Cloned the repo, imported function and it crashed)
  • Can you repro in a clean virtualenv? Not sure

RuntimeError: Cannot run the event loop while another loop is running

I am runnig your test case test_pool_concurrency() method which is there in PerfTest.py. But unfortunately I am getting a runtime error, PFB for the detailed error,
-------------------------------------------------Detailed Error-------------------------
RuntimeError: Cannot run the event loop while another loop is running
aio process 8186 failed
Traceback (most recent call last):
File "/home/mastan/env2/lib/python3.6/site-packages/aiomultiprocess/core.py", line 103, in run_async
self.aio_target(*self.aio_args, **self.aio_kwargs)
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 411, in run_forever
'Cannot run the event loop while another loop is running')
RuntimeError: Cannot run the event loop while another loop is running
Process ForkProcess-225:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/mastan/env2/lib/python3.6/site-packages/aiomultiprocess/core.py", line 103, in run_async
self.aio_target(*self.aio_args, **self.aio_kwargs)
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 411, in run_forever
'Cannot run the event loop while another loop is running')
RuntimeError: Cannot run the event loop while another loop is running
aio process 8187 failed

Steps followed:
-> from perf import PerfTest
-> pt = PerfTest()
-> pt.test_pool_concurrency()

Same thing is happening when i try to use aiomultiprocess in my present code.Could you please let me know if i am doing anything wrong,

Details


My present scenario:

I am using multiprocessing with pool.starmap() to process the records which i am reading from another resource. I am creating processes based on cpucount() of that machine ,for example 10 processes are there, then creating same no of db connections to give each record and db connection as tuple for a process to call the actual method.


from multiprocessing.dummy import Pool

def process_record(record, conn):
    # actual processing

def main():
	processes = cpu_count() * 5
    db_list = []
    for p in range(processes):
        db_conn_list.append(get_db_connection())
    print("Initializing the pool with " + str(processes) + " processes")
    pool = Pool(processes)
    try:
        response = self.client.get_records(ShardIterator=self.iterator)
        records = response.get('Records')
        arguments = zip(records[], db_conn_list)
        pool.starmap(process_record, arguments)
    except Exception as e:
        pool.close()
        pool.join()
        time.sleep(1)

if __name__ == '__main__':
    main()

Using aioprocess in my current code:


from aiomultiprocess import Pool

async def process_record(record, engine):
    processor = BitPostProcessor(conn_string, engine)
    await processor.insert_record(record)

async def main():
	processes = cpu_count() * 5
    db_list = []
    for p in range(processes):
        db_conn_list.append(get_db_connection())
    print("Initializing the pool with " + str(processes) + " processes")
    pool = Pool(processes)
    try:
        response = self.client.get_records(ShardIterator=self.iterator)
        records = response.get('Records')
        arguments = zip(records[], db_conn_list)
        await pool.starmap(process_record, arguments)
    except Exception as e:
        pool.close()
        pool.join()
        time.sleep(1)

if name == 'main':
asyncio.get_event_loop().run_until_complete(main())

  • OS: Ubuntu 16.04
  • Python version: python 3.6.3
  • aiomultiprocess version:0.4.0
  • Can you repro on master? Yes, using test case
  • Can you repro in a clean virtualenv? yes, i can

Close versus terminate

Description

I would like to know the difference between Pool.close() and Pool.terminate. The documentation has the following to say:

close() → None
Close the pool to new visitors.

terminate() → None
No running by the pool!

Which is unclear. I thought close would prevent new tasks from being applied, and terminate would stop all currently running tasks, but that doesn't seem to be the case.

How to get a reference of created process

Hi, i can't manage to get a reference of p (Process) in a other coroutine.
The call from main() p = await create_process() is blocked. Any idea or this is a bug?

import asyncio
from aiomultiprocess import Process

async def do_sleep():
    print('sleep')
    await asyncio.sleep(10000)

async def create_process():
    p = Process(target=do_sleep)
    p.start()
    # p.terminate() # here works
    return p

async def main():
    p = await create_process()
    # do some stuff with process but it's blocked here
    p.terminate()
    
if __name__ == "__main__":
    asyncio.run(main())

Details

  • OS: Linux
  • Python version: Python 3.7.3
  • aiomultiprocess version: 0.7.0

[Question] Exceptions, ProxyException and Sentry

Hi @jreese !

I noticed that currently all exceptions in the worker processes are silenced and transferred back to master process to raise ProxyExceptionss. I'm looking for a way to do something with exception within the worker process, in my case - submit to Sentry. Doing this from worker process will allow Sentry SDK to capture the original exception + local variables from all frames.

Ideally I want this:
https://github.com/omnilib/aiomultiprocess/blob/main/aiomultiprocess/pool.py#L109

try:
    result = future.result()
except BaseException as e:
    import sentry_sdk; sentry_sdk.capture_exception(e)
    ...

I propose to add API similar to sys.excepthook into aiomultiprocess.
Do you have any thoughts on this? I can prepare PR.

Thanks!

how multi processes can be used for each task?

Description

I test the following two methods, method_1 can show multiprocessing is involved for task dispatching. While in method_2, all tasks are involved in a single process, can anyone show me a way out for involving multiprocesses in method_2? Thx in advanced.

Details

async def get(url):
    async with request("GET", url) as response:
        result = await response.text("utf-8")
        logger.info(len(result))
        return result


async def method_1():
    urls = ["https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev"]
    async with amp.Pool() as pool:
        async for result in pool.map(get, urls):
            logger.info(len(result))


async def method_2():
    pool_tasks = []
    calls_list = ["https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev"]
    async with amp.Pool() as pool:
        for call in calls_list:
            pool_tasks.append(pool.apply(get, args=[call]))
        [await _ for _ in tqdm(asyncio.as_completed(pool_tasks), total=len(pool_tasks), ncols=90, desc="total", position=0, leave=True)]

  • OS: Mac
  • Python version: 3.8.2
  • aiomultiprocess version: 0.9.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

example for server side

Description

hi,
could you please provide a simple example for server side based on this example:
https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams

Details

I tried but I got this:

RuntimeError: Task <Task pending coro=<handle_echo() running at server0.py:15> cb=[_run_until_complete_cb() at /usr/local/lib/python3.7/asyncio/base_events.py:158]> got Future attached to a different loop

on windows nothing happend

  • OS: linux/windows
  • Python version: 3.7.2
  • aiomultiprocess version: 0.5.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

When building with pyinstaller, error: unrecognized arguments: --multiprocessing-fork pipe_handle=832 occurs.

Description

I am writing a python program that takes arguments as argparse and uses aiomultiprocess.

https://github.com/pyinstaller/pyinstaller/wiki/Recipe-Multiprocessing

Even with the following workaround, things were still there.

I don't think it's a problem with pyinstaller, so I'll post the issue here.

If not, I'm really sorry.

Thank you for taking the time.

Details

  • OS: Window 10
  • Python version: Python 3.9
  • aiomultiprocess version: 0.7.0

Better examples in README

Description

I used code from the README file

from aiohttp import request
from aiomultiprocess import Pool

async def fetch(url):
    return await request("GET", url)

urls = ["https://jreese.sh", ...]
async with Pool() as pool:
    result = await pool.map(fetch, urls)

result is

async with Pool() as pool:
         ^
SyntaxError: invalid syntax

Then I follow the https://speakerdeck.com/jreese/thinking-outside-the-gil-2?slide=42 and wrapped it to function like

async def fetch_all(urls):
    async with Pool() as pool:
        results = await pool.map(fetch, urls)

urls = ["https://github.com", "https://google.com"]
fetch_all(urls)

I've ended with

try3.py:44: RuntimeWarning: coroutine 'fetch_all' was never awaited
  fetch_all(urls)

so do I still need to use some asyncio function?
and I didn't find any example how to read result from fetch_all only make return results?

Details

  • OS: Ubuntu 16.04
  • Python version: 3.6.8
  • aiomultiprocess version: 0.5.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

Can not use asyncio.Queue in Process

Description

class HTTPSocketServer:
    def __init__(self):
        # config
        self._username = ""
        self._password = ""

        self.wx = None
        self._q = asyncio.Queue(maxsize=10)    

    @staticmethod
    async def ws_response(ws_socket, path):
        """
        websocket的response
        :param ws_socket: websocket
        :param path:
        """
        print(ws_socket, path)
        pass

    async def response(self, path: str, request_headers):
        """
        websocket HTTP Response
        :param path: 
        :param request_headers: 
        :return: HTTP Response
        """
        if path == "/ticketList":
            result = self._q.get()
            print(result)
            response_json = {
                "code": 200,
                "result": result,
                "msg": "success"
            }
            return http.HTTPStatus.OK, [], json.dumps(response_json).encode("UTF-8")
        else:
            result = '\r\n'.join([
                'HTTP/1.1 404 Not Found',
                'Content-Type: text/plain',
                '',
                '404 Not Found\n',
            ]).encode("UTF-8")
            return http.HTTPStatus.BAD_REQUEST, [], result

    async def run_spider(self):
        self.wx = WeiXinJumpSiteTicketCrawler(
            username=self._username,
            password=self._password,
            queue=self._q
        )

        await self.wx.task_start()

    def run_server(self, host: str = "127.0.0.1", port: int = 8080):
        """
        :param host: 
        :param port: 
        """
        event_loop = asyncio.get_event_loop()
        event_loop.run_until_complete(
                    websockets.serve(ws_handler='',
                                     host=host,
                                     port=port,
                                     process_request=self.response))
        print("Host: {0} Port: {1}".format(host, port))
        # 在run_forever之前启动爬虫进程

        spider_process = Process(target=ws.run_spider, args=(self._q, ))
        spider_process.start()
        # run_forever
        event_loop.run_forever()


if __name__ == '__main__':
    ws = HTTPSocketServer()
    ws.run_server(port=10020)

I gotthe erroe message:

Traceback (most recent call last):
  File "D:/Python37Projects/weixin_ticket_spider/socket_server.py", line 99, in <module>
    ws.run_server(port=10020)
  File "D:/Python37Projects/weixin_ticket_spider/socket_server.py", line 92, in run_server
    spider_process.start()
  File "C:\Python\Python37\lib\site-packages\aiomultiprocess\core.py", line 140, in start
    return self.aio_process.start()
  File "C:\Python\Python37\lib\multiprocessing\process.py", line 112, in start
    self._popen = self._Popen(self)
  File "C:\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

Details

  • OS: Window10
  • Python version: Python3.7.2
  • aiomultiprocess version: 0.50
  • Can you repro on master? Yes
  • Can you repro in a clean virtualenv? Yes

[Question]: A usecase for numpy processing with frequent IO communication

I am trying to use aiomultiprocess in my project which the simplified idea of current implementation is

# pip install gym
import gym
import numpy as np
from multiprocessing import Process, Pipe

def worker(master_conn, worker_conn):
    master_conn.close()
    
    env = gym.make('Pendulum-v0')
    env.reset()
    
    while True:
        cmd, data = worker_conn.recv()
        
        if cmd == 'close':
            worker_conn.close()
            break
        elif cmd == 'step':
            results = [env.step(data) for _ in range(1000)]
            worker_conn.send(results)
    
class Master(object):
    def __init__(self):
        self.master_conns, self.worker_conns = zip(*[Pipe() for _ in range(10)])
        self.list_process = [Process(target=worker, args=[master_conn, worker_conn], daemon=True) 
                             for master_conn, worker_conn in zip(self.master_conns, self.worker_conns)]
        [p.start() for p in self.list_process]
        [worker_conn.close() for worker_conn in self.worker_conns]
        
    def go(self, actions):
        [master_conn.send(['step', action]) for master_conn, action in zip(self.master_conns, actions)]
        results = [master_conn.recv() for master_conn in self.master_conns]
        
        return results
    
    def close(self):
        [master_conn.send(['close', None]) for master_conn in self.master_conns]
        [p.join() for p in self.list_process]

master = Master()
master.go(np.random.rand(10, 1))

It has a lot of IO communication through Pipes, I am wondering how could I speed it up with aiomultiprocess

how to graciously terminate multiprocessing pool when ctrl + c

Description

I've tried all the methods mentioned in this article, but when I press ctrl+c, there's still a lot of exception information. how to graciously terminate multiprocessing pool?

Details

  • OS:Win10
  • Python version:CPython 3.7.4
  • aiomultiprocess version: 0.5.0
  • Can you repro on master? Yes
  • Can you repro in a clean virtualenv? Yes

[Question] - Sample code

Hi, I'm facing some issues with process that some functions are initialized before the process start and should be called inside this new process due some dependecies, I found this code from the docs

However, this also requires that any objects or coroutines used must be importable from the fresh child processes. Inner functions, lambdas, or object types defined at runtime, cannot be serialized to these freshly spawned processes.

But how I should execute thoses functions inside this new process with aiomultiprocess? Didn't kinda understood "must be importable from the fresh child processes".

Regards,
Savage.

Could you please post some examples for a http server

Description

I'm wondering if this library helps multiple performance of a async http server like aiohttp.
Could you please give a example?
thx.

Details

  • OS: Linux
  • Python version: 3.7
  • aiomultiprocess version: latest
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

eventloop blocked on queue.get on subprocess death

Description

I think what I'm seeing is that the event loop gets blocked on queue.get() when the subprocess dies while sending data through the result queue, the repro below shows the process blocks the event loop (blocked on connection._recv) and it fails to make any more progress. It seems like the queue.recv in multiprocess.connection does several recv()s and if the subprocess dies in the middle of a send to the pipe this can happen. It seems like this would "normally" be guarded against by an EOF on the receive end of the pipe, but we don't (and can't) close the writer end of the pipe in the parent process because we need to pass it into new subprocesses.

I see that aiomultiprocess attempts to detect dead processes and restart them, is that just best effort/is this expected behavior?

minimal repro:

import asyncio
import os
from aiomultiprocess import pool


async def f():
    return ["absc"*1000000]


async def g():
    await asyncio.sleep(.05)
    os.kill(os.getpid(), 9)


async def hello():
    while True:
        print('still alive')
        await asyncio.sleep(1)


if __name__ == '__main__':
    async def main():
        asyncio.create_task(hello())
        async with pool.Pool(processes=1) as p:
            await asyncio.gather(
                p.apply(f, ()),
                p.apply(f, ()),
                p.apply(f, ()),
                p.apply(f, ()),
                p.apply(f, ()),
                p.apply(f, ()),
                p.apply(g, ()),
            )


    asyncio.run(main())

Details

  • OS:
  • Python version: 3.8.9
  • aiomultiprocess version: 0.8.0
  • Can you repro on master? yes
  • Can you repro in a clean virtualenv? yes

loop.map cannot work in windows

Description

Hi, @jreese
I have take a testing for example of pool.

Code as below:

import asyncio
from aiohttp import request
from aiomultiprocess import Pool

async def get(url):
    async with request("GET", url) as response:
        return await response.text("utf-8")

async def main():
    urls = ["https://jreese.sh", ...]
    async with Pool() as pool:
        result = await pool.map(get, urls)

if __name__ == "__main__":
    asyncio.run(main())

However, I got the error message:

aio process 12492 failed
Traceback (most recent call last):
File "C:...\Python37\lib\multiprocessing\queues.py", line 107, in get
raise Empty
_queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "C:...\Python37\lib\aiomultiprocess\core.py", line 133, in run_async
result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))
File "C:...\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete
return future.result()
File "C:...\Python37\lib\aiomultiprocess\pool.py", line 53, in run
task: PoolTask = self.tx.get_nowait()
File "...\Python37\lib\multiprocessing\queues.py", line 126, in get_nowait
return self.get(False)
File "...\Python37\lib\multiprocessing\queues.py", line 111, in get
self._rlock.release()
OSError: [WinError 6] invalid handler.

I have check the code, I foud that the exception from here:

image

Line 53 and 54.

Would you please give me some suggestion?

Details

  • OS: Windows 10 , 64 bit
  • Python version: 3.7.2
  • aiomultiprocess version: 0.7.0
  • Can you repro on master? no
  • Can you repro in a clean virtualenv? yes

The First Simple Example Has a Question

Description

README.md

Executing a coroutine on a child process is as simple as:

async def put(url, params):

  1. This args has't params
    async def main():
    p = Process(target=put, args=("https://jreese.sh", ))
    await p

TypeError: put() missing 1 required positional argument: 'params'

Details

  • OS: Centos 7.4
  • Python version: 3.7.2
  • aiomultiprocess version: 0.5.0
  • Can you repro on master?
  • Can you repro in a clean virtualenv?

Could we have an async multiprocess-safe lock?

Hi,

Thanks for creating such an amazing lib! I'm asking this because the built-in multiprocessing module has locks which are useful for handling shared memory safely. However, obtaining the lock is a blocking operation which can be slow in some cases.

Therefore, is it possible to have an async version of multiprocessing lock?

Thanks!

[Error] queue.Full on macOS with >32767 items.

Description

its my code

import aiomultiprocess
import aiohttp
import multiprocessing
import logging


async def run(url):
    async with asyncio.Semaphore(500):
        async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
            async with session.get(url, timeout=20) as resp:
                result = await resp.text()
                if 'test' in result:
                    logging.warning(url)
                else:
                    logging.error(url)


async def main():
    url = 'http://www.baidu.com/index.shtml?id='
    urls = [url + str(i) for i in range(1000000)]
    async with aiomultiprocess.Pool(10)as pool:
        await pool.map(run, urls)


if __name__ == '__main__':
    multiprocessing.freeze_support()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Details

error log

Traceback (most recent call last):
  File "/Users/hywell/Desktop/getr/get-res.py", line 23, in main
    await pool.map(run, urls)
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 412, in map
    tids = [self.queue_work(func, (item,), {}) for item in iterable]
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 412, in <listcomp>
    tids = [self.queue_work(func, (item,), {}) for item in iterable]
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 367, in queue_work
    self.tx_queue.put_nowait((task_id, func, args, kwargs))
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
    main()
  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "/Users/hywell/Desktop/getr/get-res.py", line 30, in <module>
    loop.run_until_complete(main())
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/Users/hywell/Desktop/getr/get-res.py", line 23, in main
    await pool.map(run, urls)
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 323, in __aexit__
    self.terminate()
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 437, in terminate
    self.close()
  File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 432, in close
    self.tx_queue.put_nowait(None)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 129, in put_nowait
    return self.put(obj, False)
  File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 83, in put
    raise Full
queue.Full
  • OS: MacOS Mojave
  • Python version: 3.7.3
  • aiomultiprocess version: 0.5.0

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.