omnilib / aiomultiprocess Goto Github PK
View Code? Open in Web Editor NEWTake a modern Python codebase to the next level of performance.
Home Page: https://aiomultiprocess.omnilib.dev
License: MIT License
Take a modern Python codebase to the next level of performance.
Home Page: https://aiomultiprocess.omnilib.dev
License: MIT License
Desired use case:
async for result in pool.map(coro, values):
...
This could be achieved by having Pool.map
return a special object that would either wait for everything and return all results when awaited, or when used in an async for
loop, wait for and return each item/future in order when they're ready.
We are running into a problem in NixOS that the test suite sometimes fails in these three tests, and that causes the whole test suite to hang indefinetely.
/nix/store/66fbv9mmx1j4hrn9y06kcp73c3yb196r-python3-3.8.9/lib/python3.8/multiprocessing/queues.py:110: Empty
=========================== short test summary info ============================
FAILED aiomultiprocess/tests/pool.py::PoolTest::test_pool_worker_exceptions
FAILED aiomultiprocess/tests/pool.py::PoolTest::test_pool_worker_max_tasks - ...
FAILED aiomultiprocess/tests/pool.py::PoolTest::test_pool_worker_stop - _queu...
=================== 3 failed, 28 passed, 2 skipped in 16.45s ===================
building of '/nix/store/lhmnh05x7gs28s4skngm9rfn0q3flic3-python3.8-aiomultiprocess-0.9.0' timed out after 7200 seconds of silence
If I disable these tests using pytest -k
the tests complete just fine.
Does not occur in 0.6.1
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
I have a task which is kind of both CPU-bound and IO-bound, like the toy code below.
When I ran it, I found that the CPU only run 100% on a single thread, not as many as the number of processes in the Pool.
Is it that there is no beneficial to run CPU-bound task at the same time with aiomultiprocess, or I wrote the code wrong?
import asyncio
from datetime import datetime
from aiohttp import request
from aiomultiprocess import Pool, Process
def fib(x):
"""Recursive function of Fibonacci number"""
if x==0:
return 0
elif x==1:
return 1
else:
return fib(x-1)+fib(x-2)
async def get(url):
# async with request("GET", url) as response:
# await asyncio.sleep(1)
# return await response.text("utf-8")
print('url ' + str(multiprocessing.current_process()) + ' ' + str(datetime.now()))
await asyncio.sleep(5)
fib(30)
print('url ' + str(multiprocessing.current_process()) + ' ' + str(datetime.now()))
async def main():
urls = ["https://jreese.sh", "https://www.baidu.com", "a", "b", "c", "d"]
# p = Process(target=get, args=("https://jreese.sh", "https://www.baidu.com",))
# await p
print(datetime.now())
async with Pool(4) as pool:
result = await pool.map(get, urls)
print(result)
print(datetime.now())
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main())
from asyncio import gather
import asyncio
from aiomultiprocess import Pool, Worker
from aiohttp import ClientSession
import aiohttp
async def get(url, host, port, user, passwd):
async with aiohttp.ClientSession() as session:
proxy_auth = aiohttp.BasicAuth('user', 'passwd')
async with session.get("http://python.org",
proxy="http://host:port",
proxy_auth=proxy_auth,
) as resp:
print(await resp.text())
async def main(host, port, user, passwd):
urls=['http://python.org']
async with Pool() as pool:
async for result in pool.map(get, urls):
print('done')
def start(host, port, user, passwd):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main(host, port, user, passwd))
if __name__ == "__main__":
start(host, port, user, passwd)
Traceback (most recent call last):
File "test.py", line 31, in <module>
start()
File "test.py", line 27, in start
asyncio.run(main())
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\runners.py", line 44, in run
return loop.run_until_complete(main)
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 616, in run_until_complete
return future.result()
File "test.py", line 21, in main
async for result in pool.map(get, urls):
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiomultiprocess\pool.py", line 145, in results_generator
yield (await self.pool.results([task_id]))[0]
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiomultiprocess\pool.py", line 308, in results
raise ProxyException(tb)
aiomultiprocess.types.ProxyException: Traceback (most recent call last):
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 969, in _wrap_create_connection
return await self._loop.create_connection(*args, **kwargs) # type: ignore # noqa
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 1050, in create_connection
transport, protocol = await self._create_connection_transport(
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 1080, in _create_connection_transport
await waiter
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 395, in _loop_writing
self._write_fut = self._loop._proactor.send(self._sock, data)
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 529, in send
self._register_with_iocp(conn)
File "c:\users\administrator\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 718, in _register_with_iocp
_overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
OSError: [WinError 87] The parameter is incorrect
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiomultiprocess\pool.py", line 110, in run
result = future.result()
File "C:\Users\Administrator\Desktop\sizeer\test.py", line 12, in get
async with session.get("http://python.org",
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\client.py", line 1117, in __aenter__
self._resp = await self._coro
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\client.py", line 520, in _request
conn = await self._connector.connect(
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 535, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 890, in _create_connection
_, proto = await self._create_proxy_connection(req, traces, timeout)
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 1139, in _create_proxy_connection
transport, proto = await self._wrap_create_connection(
File "C:\Users\Administrator\.virtualenvs\sizeer-QHiYSeeW\lib\site-packages\aiohttp\connector.py", line 975, in _wrap_create_connection
raise client_error(req.connection_key, exc) from exc
aiohttp.client_exceptions.ClientConnectorError: Cannot connect to host python.org:443 ssl:default [The parameter is incorrect]
I get this error no matter if i use Worker or Pool. Without proxies it works fine, adding verify_ssl=False to request doesn't resolve the issue as well. What's more without using Pool or Worker, only with asyncio it works just fine both with proxies and without. However, with python 3.8.5 and macos it works, not sure why (downgrading python on windows didn't resolve the problem). I suppose it's an issue with aiomultiprocess. Thanks in advance for help!
sorry for this boring question,
but any reasons it is have to be 0.005 ?
maybe a constant of module or package level as an instead.
especially the loop()
of Pool
about # let someone else do some work for once
https://github.com/jreese/aiomultiprocess/blob/ef791e96af4d8fd802fecb20353f5abdc002ee0a/aiomultiprocess/core.py#L292
P.S it seems the class Process
missing a member function terminate
P.S.S maybe #5 refers to python fork() support stuff of asyncio seems python 3.6.1+ works, check my last reply of #5
The github link at pypi links to aioitertools instead of aiomultiprocess.
Hi, great library, thanks!
i want to check the url if has image,and my check code is:
async def get_image(url:'http://test.com/',dirs:'[a.jpg,b.jpg,c.jpg]'):
async with aiohttp.ClientSession() as session:
for dir in dirs:
async with session.get(url=url+dir) as resp:
if resp.status == 200:
return await resp.content()
# only need one exists image url
async def main(url,dirs):
async with aiomultiprocess.Pool() as pool:
result = await pool.map(get_image,??,??)
# hao to set there?
print(result)
if __name__ == '__main__':
urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/',]
dirs = ['a.jpg','b.jpg','c.jpg']
loop = asyncio.get_event_loop()
loop.run_until_complete(main(urls,dirs))
and i want to use pool to mission accomplished,but i don konw hao to desgin.
so,i desgin code again:
dirs = ['a.jpg', 'b.jpg', 'c.jpg']
async def get_image(url:'http://test.com/'):
async with aiohttp.ClientSession() as session:
for dir in dirs:
print(url+dir)
async with session.get(url=url+dir,timeout=3) as resp:
if resp.status == 200:
return await resp.content()
# only need one exists image url
async def main():
urls = urls = ['http://1.com/','http://2.com/','http://3.com/','http://4.com/']
async with aiomultiprocess.Pool() as pool:
result = await pool.map(get_image,urls)
print(result)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
now is success,but i wang to konw,hao to passing tow iterable parameters in function main()?
Run the demo code:
import asyncio
from aiohttp import request
from aiomultiprocess import Worker
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main():
p = Worker(target=get, args=("https://jreese.sh", ))
response = await p
# asyncio.run(main())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Occur some error:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 114, in _main
prepare(preparation_data)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 277, in _fixup_main_from_path
run_name="__mp_main__")
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globals)
File "/Users/Matrix/Desktop/eoftest.py", line 15, in <module>
loop.run_until_complete(main())
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "/Users/Matrix/Desktop/eoftest.py", line 10, in main
p = Worker(target=get, args=("https://jreese.sh", ))
File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 204, in __init__
super().__init__(*args, process_target=Worker.run_async, **kwargs)
File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 103, in __init__
namespace=get_manager().Namespace(),
File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 29, in get_manager
_manager = context.Manager()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 56, in Manager
m.start()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 563, in start
self._process.start()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 112, in start
self._popen = self._Popen(self)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_fork.py", line 20, in __init__
self._launch(process_obj)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 143, in get_preparation_data
_check_not_importing_main()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/spawn.py", line 136, in _check_not_importing_main
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Traceback (most recent call last):
File "eoftest.py", line 15, in <module>
loop.run_until_complete(main())
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
return future.result()
File "eoftest.py", line 10, in main
p = Worker(target=get, args=("https://jreese.sh", ))
File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 204, in __init__
super().__init__(*args, process_target=Worker.run_async, **kwargs)
File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 103, in __init__
namespace=get_manager().Namespace(),
File "/usr/local/lib/python3.7/site-packages/aiomultiprocess/core.py", line 29, in get_manager
_manager = context.Manager()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 56, in Manager
m.start()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 567, in start
self._address = reader.recv()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 250, in recv
buf = self._recv_bytes()
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
buf = self._recv(4)
File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 383, in _recv
raise EOFError
EOFError
Post pycon code example
Dose the processpool can only create one task? and then finish it self?
can I create a daemon Process Pool and send coro_task to it,get get result using await?
thx
Can we provide a instance method instead of a static function?
async with Pool() as pool:
await pool.map(self.write_data, node)
I get back this error. Not sure what to make of this error.
Traceback (most recent call last):
File "/usr/lib/python3.8/multiprocessing/queues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "/usr/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
Can someone please help me out?
When importing aiomultiprocess, I got this error.
from aiomultiprocess import Pool
Traceback (most recent call last):
File "", line 1, in
from aiomultiprocess import Pool
File "C:\Users\patlo\Anaconda3\lib\site-packages\aiomultiprocess_init_.py", line 11, in
from .core import Pool, Process, Worker
File "C:\Users\patlo\Anaconda3\lib\site-packages\aiomultiprocess\core.py", line 35, in
context = multiprocessing.get_context("fork")
File "C:\Users\patlo\Anaconda3\lib\multiprocessing\context.py", line 238, in get_context
return super().get_context(method)
File "C:\Users\patlo\Anaconda3\lib\multiprocessing\context.py", line 192, in get_context
raise ValueError('cannot find context for %r' % method)
ValueError: cannot find context for 'fork'
Hi, great library, thanks!
I want to run async generator in Process Pool using aiomultiprocess lib, how can I do that, any pointer. Currently I'm getting error like:
File "/home/rohankar/anaconda3/lib/python3.6/site-packages/aiomultiprocess/core.py", line 93, in __init__ raise ValueError(f"target must be coroutine function") ValueError: target must be coroutine function
Script that I tried;
import asyncio
async def ait(nr):
for i in range(nr):
await asyncio.sleep(0.1)
yield i
from aiomultiprocess import Worker
async def main():
# This Works
# async for i in ait(10):
# print(i)
# This throw error
p = Worker(target=ait, args=(20,))
p.start()
print(await p)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Hi, I'm trying to combine aiomultiprocess
with aiopipe
, which in its demo shows that it works well with multiprocessing.Process
(and indeed it works well with multiprocessing
since I've tried it).
I've modified aiopipe's duplex pipe demo into the snippet as below. The only difference is that I changed multiprocessing.Process
to aiomultiprocess.Process
and made it awaiting on proc.join()
instead of a blocking call. And the asyncio.run
part is also removed since it's no longer needed in aiomultiprocess
.
Here is my snippet:
# test.py
import asyncio
from aiomultiprocess import Process
from aiopipe import aioduplex
async def test(ipc):
async with ipc.open() as (rx, tx):
tx.write(b"hello")
rep = await rx.readline()
tx.write(rep.upper())
async def main():
mainpipe, chpipe = aioduplex()
with chpipe.detach() as chpipe:
proc = Process(target=test, args=(chpipe,))
proc.start()
async with mainpipe.open() as (rx, tx):
req = await rx.read(5)
tx.write(req + b" world\n")
msg = await rx.readline()
await proc.join()
print(msg)
if __name__ == "__main__":
asyncio.run(main())
But it failed with the Bad file descriptor
error.
aio process 18680 failed
Traceback (most recent call last):
File "<project_dir>/venv/lib/python3.7/site-packages/aiomultiprocess/core.py", line 132, in run_async
result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "<project_dir>/test.py", line 10, in test
async with ipc.open() as (rx, tx):
File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
return await self.gen.__anext__()
File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 267, in open
async with self._rx.open() as rx, self._tx.open() as tx:
File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
return await self.gen.__anext__()
File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 133, in open
transport, stream = await self._open()
File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 196, in _open
os.fdopen(self._fd))
File "/usr/lib/python3.7/os.py", line 1026, in fdopen
return io.open(fd, *args, **kwargs)
OSError: [Errno 9] Bad file descriptor
Process SpawnProcess-2:
Traceback (most recent call last):
File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "<project_dir>/venv/lib/python3.7/site-packages/aiomultiprocess/core.py", line 132, in run_async
result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))
File "/usr/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "<project_dir>/test.py", line 10, in test
async with ipc.open() as (rx, tx):
File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
return await self.gen.__anext__()
File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 267, in open
async with self._rx.open() as rx, self._tx.open() as tx:
File "/usr/lib/python3.7/contextlib.py", line 170, in __aenter__
return await self.gen.__anext__()
File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 133, in open
transport, stream = await self._open()
File "<project_dir>/venv/lib/python3.7/site-packages/aiopipe/__init__.py", line 196, in _open
os.fdopen(self._fd))
File "/usr/lib/python3.7/os.py", line 1026, in fdopen
return io.open(fd, *args, **kwargs)
OSError: [Errno 9] Bad file descriptor
The error was raised from the child process when it tried to open the pipes at the line ipc.open()
in function test
. In the backtrace it shows that self._fd
is a bad file descriptor which is obtained from os.pipe()
. (source)
def aiopipe() -> Tuple["AioPipeReader", "AioPipeWriter"]:
rx, tx = os.pipe()
return AioPipeReader(rx), AioPipeWriter(tx)
I've dug a little bit into both aiomultiprocess
and aiopipe
, both of which are not in large code base therefore it's not a pain to do code tracing.
I see aiomultiprocess
use multiprocessing.Process
under the hood (did I misunderstand anything?).
https://github.com/jreese/aiomultiprocess/blob/aa623804a668ee2d6cd0a8b56a630d1e9bdaef78/aiomultiprocess/core.py#L107-L113
Since aiopipe
plays well with multiprocessing
but not aiomultiprocess
, I need some helps to figure out what I mistook and when and why was the file descriptor corrupted.
Thanks for the great library anyway!
aiomultiprocess
and aiopipe
are installed.(venv) $ pip freeze
aiomultiprocess==0.7.0
aiopipe==0.2.2
Hello,
I'm trying out your cool library, thanks! How would one go about limiting concurrency? I get an aexit using a BoundedSemaphore.
When I using the Pool module the program was stuck without error shows only in Win10(with your sample code). CMD will show some error after a long while.
[Error - 1:22:03 PM] Request textDocument/hover failed.
Message: unknown document
Code: -32000
at Microsoft.Python.LanguageServer.Implementation.ProjectFiles.GetEntry(Uri documentUri, Boolean throwIfMissing)
at Microsoft.Python.LanguageServer.Implementation.ProjectFiles.GetEntry(TextDocumentIdentifier document, Nullable`1 expectedVersion, ProjectEntry& entry, PythonAst& tree)
at Microsoft.Python.LanguageServer.Implementation.Server.Hover(TextDocumentPositionParams params, CancellationToken cancellationToken)
at Microsoft.Python.LanguageServer.Implementation.LanguageServer.Hover(JToken token, CancellationToken cancellationToken)
But I can run this code in CentOS 6, it seems like only shows in the Window,
I've used debuger to check this error, the code was stuck in base_events.py and the method call run_forever, it looks like an endless loop there
Here is my code:
import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
session.close()
return response
async def request():
url = 'https://jreese.sh'
urls = [url for _ in range(5)]
async with Pool() as pool:
result = await pool.map(get, urls)
return result
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
I'm trying to implement a progress bar of some sort, like tqdm. Is there any way to do this with Pool? Should i use Process directly?
Thanks
https://github.com/jreese/aiomultiprocess/runs/313115111
aiomultiprocess starts more the one process if the coroutine is not in the same file. Below a short example.
import asyncio
from aiomultiprocess import Process, Worker
import aiomultiprocess
import os
aiomultiprocess.set_start_method('fork')
async def do_sleep():
while True:
print(os.getpid())
await asyncio.sleep(1)
async def create_process():
p = Process(target=do_sleep)
p.start()
return p
async def main():
p = await create_process()
if __name__ == "__main__":
asyncio.run(main())
If i run this and check the processes i get an expected result. Main process and sub process.
$ ps auxf
159822 .... \_ python process_in_file.py
159828 ............ \_ python process_in_file.py
Now i just outsource the coroutine to other file. The code still the same
process_from_other_module_start.py
import os
import asyncio
from aiomultiprocess import Process, Worker
import aiomultiprocess
import process_from_other_module_import
aiomultiprocess.set_start_method('fork')
async def create_process():
p = Process(target=process_from_other_module_import.do_sleep)
return await p
async def main():
p = await create_process()
if __name__ == "__main__":
asyncio.run(main())
process_from_other_module_import.py
import asyncio
import os
async def do_sleep():
while True:
print(os.getpid())
await asyncio.sleep(1
$ ps auxf
159980 ... \_ python process_from_other_module_start.py
159981 ...... \_ python process_from_other_module_start.py
159986 ............. \_ python process_from_other_module_start.py
System shows me two sub-processes but i started just one...? Is this a Bug? Can somebody explain it?
I am facing this error and am not sure how to fix it
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
I tried doing multiprocessing.freeze_support() in main like so:
if __name__ == '__main__':
import multiprocessing
multiprocessing.freeze_support()
asyncio.run(main=entry_point())
I recently just stumbled across this tool while trying to optimize my code, this seems like a great project!
My current problem is I keep getting this error and am unsure why, testing on wslv1 (Windows Subsystem for Linux version 1) is not feasible as the library I am using seems to be broken on wslv1; however, that's a different issue.
Here is how I am trying to use the tool
from screenshot_class import take_screenshot, screenshot_handler, _chunk_list, receive
unique_resolved_domains = list(sorted({url.split(':')[0]for url in full if ':' in url}))
# Grab resolved subdomains
# coroutines = [take_screenshot(url) for url in unique_resolved_domains]
#await screenshot_handler(coroutines)
async with aiomultiprocess.Pool() as pool:
print('Created pool')
#serialized_tiles = [take_screenshot(url) for url in unique_resolved_domains]
#print(f'Length of serialized_tiles: {len(serialized_tiles)} ')
#for chunk in _chunk_list(serialized_tiles, MAX_QUEUE_SIZE):
#print(f'Chunk: {chunk} and length: {len(chunk)}')
try:
await pool.map(take_screenshot, unique_resolved_domains)
#await pool.map(screenshot_handler, chunk)
except Exception as ee:
print(f'An excpeption has occurred while mapping: {ee}')
#continue
Yes I did get inspiration from a recent project
screenshot_class:
import asyncio
from pyppeteer import launch
def _chunk_list(items, chunk_size):
return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)]
async def worker(queue):
while True:
# Get a "work item" out of the queue.
stor = await queue.get()
try:
await stor
queue.task_done()
# Notify the queue that the "work item" has been processed.
except Exception:
queue.task_done()
async def screenshot_handler(lst):
print('Created screenshot handler')
queue = asyncio.Queue()
for stor_method in lst:
# enqueue the coroutines
queue.put_nowait(stor_method)
# Create ten worker tasks to process the queue concurrently.
tasks = []
for i in range(10):
task = asyncio.create_task(worker(queue))
tasks.append(task)
# Wait until the queue is fully processed.
await queue.join()
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
async def receive(lst):
for url in lst:
await take_screenshot(url)
async def take_screenshot(url):
url = f'http://{url}' if ('http' not in url and 'https' not in url) else url
url.replace('www.', '')
print(f'Taking a screenshot of: {url}')
browser = await launch(headless=True, ignoreHTTPSErrors=True, args=["--no-sandbox"])
page = await browser.newPage()
try:
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.106 Safari/537.36')
# default wait time of 30 seconds
await page.goto(url)
await page.screenshot({ 'path': f'{url.replace("https://", "").replace("http://", "")}.png'})
except Exception as e:
print(f'Exception occurred: {e} for: {url} ')
# No matter what happens make sure browser is closed
await browser.close()
There are a lot of comments as I tried a lot of different things, I am not sure if attempting to use asnycio queue + aiomultiprocess would be a wise decision. I have not been able to find examples of using queues with aiomultiprocessing so will need to watch the talk and hope it's their or give it the old scientific method.
The big problem is my code seems to run fine then it runs the program again before spitting that issue. I tried using a Kali vm and using the fork method; however, got the same error.
Apologies for writing an essay 😅
I am runnig your test case test_pool_concurrency() method which is there in PerfTest.py. But unfortunately I am getting a runtime error, PFB for the detailed error,
-------------------------------------------------Detailed Error-------------------------
RuntimeError: Cannot run the event loop while another loop is running
aio process 8186 failed
Traceback (most recent call last):
File "/home/mastan/env2/lib/python3.6/site-packages/aiomultiprocess/core.py", line 103, in run_async
self.aio_target(*self.aio_args, **self.aio_kwargs)
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 411, in run_forever
'Cannot run the event loop while another loop is running')
RuntimeError: Cannot run the event loop while another loop is running
Process ForkProcess-225:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 249, in _bootstrap
self.run()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/home/mastan/env2/lib/python3.6/site-packages/aiomultiprocess/core.py", line 103, in run_async
self.aio_target(*self.aio_args, **self.aio_kwargs)
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 411, in run_forever
'Cannot run the event loop while another loop is running')
RuntimeError: Cannot run the event loop while another loop is running
aio process 8187 failed
Steps followed:
-> from perf import PerfTest
-> pt = PerfTest()
-> pt.test_pool_concurrency()
Same thing is happening when i try to use aiomultiprocess in my present code.Could you please let me know if i am doing anything wrong,
My present scenario:
I am using multiprocessing with pool.starmap() to process the records which i am reading from another resource. I am creating processes based on cpucount() of that machine ,for example 10 processes are there, then creating same no of db connections to give each record and db connection as tuple for a process to call the actual method.
from multiprocessing.dummy import Pool
def process_record(record, conn):
# actual processing
def main():
processes = cpu_count() * 5
db_list = []
for p in range(processes):
db_conn_list.append(get_db_connection())
print("Initializing the pool with " + str(processes) + " processes")
pool = Pool(processes)
try:
response = self.client.get_records(ShardIterator=self.iterator)
records = response.get('Records')
arguments = zip(records[], db_conn_list)
pool.starmap(process_record, arguments)
except Exception as e:
pool.close()
pool.join()
time.sleep(1)
if __name__ == '__main__':
main()
Using aioprocess in my current code:
from aiomultiprocess import Pool
async def process_record(record, engine):
processor = BitPostProcessor(conn_string, engine)
await processor.insert_record(record)
async def main():
processes = cpu_count() * 5
db_list = []
for p in range(processes):
db_conn_list.append(get_db_connection())
print("Initializing the pool with " + str(processes) + " processes")
pool = Pool(processes)
try:
response = self.client.get_records(ShardIterator=self.iterator)
records = response.get('Records')
arguments = zip(records[], db_conn_list)
await pool.starmap(process_record, arguments)
except Exception as e:
pool.close()
pool.join()
time.sleep(1)
if name == 'main':
asyncio.get_event_loop().run_until_complete(main())
Hello i'm on W10, my problem is with importing :
from aiomultiprocess import Pool
Gives this error :
ImportError: cannot import name 'Pool' from partially initialized module 'aiomultiprocess' (most likely due to a circular import)
Any idea ?
I would like to know the difference between Pool.close() and Pool.terminate. The documentation has the following to say:
close() → None
Close the pool to new visitors.
terminate() → None
No running by the pool!
Which is unclear. I thought close
would prevent new tasks from being applied, and terminate
would stop all currently running tasks, but that doesn't seem to be the case.
Hi, i can't manage to get a reference of p (Process) in a other coroutine.
The call from main() p = await create_process() is blocked. Any idea or this is a bug?
import asyncio
from aiomultiprocess import Process
async def do_sleep():
print('sleep')
await asyncio.sleep(10000)
async def create_process():
p = Process(target=do_sleep)
p.start()
# p.terminate() # here works
return p
async def main():
p = await create_process()
# do some stuff with process but it's blocked here
p.terminate()
if __name__ == "__main__":
asyncio.run(main())
Hi @jreese !
I noticed that currently all exceptions in the worker processes are silenced and transferred back to master process to raise ProxyExceptions
s. I'm looking for a way to do something with exception within the worker process, in my case - submit to Sentry. Doing this from worker process will allow Sentry SDK to capture the original exception + local variables from all frames.
Ideally I want this:
https://github.com/omnilib/aiomultiprocess/blob/main/aiomultiprocess/pool.py#L109
try:
result = future.result()
except BaseException as e:
import sentry_sdk; sentry_sdk.capture_exception(e)
...
I propose to add API similar to sys.excepthook into aiomultiprocess
.
Do you have any thoughts on this? I can prepare PR.
Thanks!
It seems there are some cases where there is already an existing, active event loop after forking, which causes an exception when the child process tries to create a new event loop. It should check for an existing event loop first, and only create one if there's no active loop available.
Target function: https://github.com/jreese/aiomultiprocess/blob/master/aiomultiprocess/core.py#L93
See #4 for context.
I test the following two methods, method_1 can show multiprocessing is involved for task dispatching. While in method_2, all tasks are involved in a single process, can anyone show me a way out for involving multiprocesses in method_2? Thx in advanced.
async def get(url):
async with request("GET", url) as response:
result = await response.text("utf-8")
logger.info(len(result))
return result
async def method_1():
urls = ["https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev"]
async with amp.Pool() as pool:
async for result in pool.map(get, urls):
logger.info(len(result))
async def method_2():
pool_tasks = []
calls_list = ["https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev", "https://jreese.sh", "https://noswap.com", "https://omnilib.dev"]
async with amp.Pool() as pool:
for call in calls_list:
pool_tasks.append(pool.apply(get, args=[call]))
[await _ for _ in tqdm(asyncio.as_completed(pool_tasks), total=len(pool_tasks), ncols=90, desc="total", position=0, leave=True)]
hi,
could you please provide a simple example for server side based on this example:
https://docs.python.org/3/library/asyncio-stream.html#tcp-echo-server-using-streams
I tried but I got this:
RuntimeError: Task <Task pending coro=<handle_echo() running at server0.py:15> cb=[_run_until_complete_cb() at /usr/local/lib/python3.7/asyncio/base_events.py:158]> got Future attached to a different loop
on windows nothing happend
aiomultiprocess seems to have the same goals as https://github.com/dano/aioprocessing but started a few years later. Do you have any thoughts on why someone might use one over the other?
data = [1, 4, 9, 16, 25]
async for value in pool.map(func, data):
func How do I use pool.map if there are multiple commits?
I am writing a python program that takes arguments as argparse and uses aiomultiprocess.
https://github.com/pyinstaller/pyinstaller/wiki/Recipe-Multiprocessing
Even with the following workaround, things were still there.
I don't think it's a problem with pyinstaller, so I'll post the issue here.
If not, I'm really sorry.
Thank you for taking the time.
I used code from the README file
from aiohttp import request
from aiomultiprocess import Pool
async def fetch(url):
return await request("GET", url)
urls = ["https://jreese.sh", ...]
async with Pool() as pool:
result = await pool.map(fetch, urls)
result is
async with Pool() as pool:
^
SyntaxError: invalid syntax
Then I follow the https://speakerdeck.com/jreese/thinking-outside-the-gil-2?slide=42 and wrapped it to function like
async def fetch_all(urls):
async with Pool() as pool:
results = await pool.map(fetch, urls)
urls = ["https://github.com", "https://google.com"]
fetch_all(urls)
I've ended with
try3.py:44: RuntimeWarning: coroutine 'fetch_all' was never awaited
fetch_all(urls)
so do I still need to use some asyncio function?
and I didn't find any example how to read result from fetch_all only make return results?
class HTTPSocketServer:
def __init__(self):
# config
self._username = ""
self._password = ""
self.wx = None
self._q = asyncio.Queue(maxsize=10)
@staticmethod
async def ws_response(ws_socket, path):
"""
websocket的response
:param ws_socket: websocket
:param path:
"""
print(ws_socket, path)
pass
async def response(self, path: str, request_headers):
"""
websocket HTTP Response
:param path:
:param request_headers:
:return: HTTP Response
"""
if path == "/ticketList":
result = self._q.get()
print(result)
response_json = {
"code": 200,
"result": result,
"msg": "success"
}
return http.HTTPStatus.OK, [], json.dumps(response_json).encode("UTF-8")
else:
result = '\r\n'.join([
'HTTP/1.1 404 Not Found',
'Content-Type: text/plain',
'',
'404 Not Found\n',
]).encode("UTF-8")
return http.HTTPStatus.BAD_REQUEST, [], result
async def run_spider(self):
self.wx = WeiXinJumpSiteTicketCrawler(
username=self._username,
password=self._password,
queue=self._q
)
await self.wx.task_start()
def run_server(self, host: str = "127.0.0.1", port: int = 8080):
"""
:param host:
:param port:
"""
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(
websockets.serve(ws_handler='',
host=host,
port=port,
process_request=self.response))
print("Host: {0} Port: {1}".format(host, port))
# 在run_forever之前启动爬虫进程
spider_process = Process(target=ws.run_spider, args=(self._q, ))
spider_process.start()
# run_forever
event_loop.run_forever()
if __name__ == '__main__':
ws = HTTPSocketServer()
ws.run_server(port=10020)
Traceback (most recent call last):
File "D:/Python37Projects/weixin_ticket_spider/socket_server.py", line 99, in <module>
ws.run_server(port=10020)
File "D:/Python37Projects/weixin_ticket_spider/socket_server.py", line 92, in run_server
spider_process.start()
File "C:\Python\Python37\lib\site-packages\aiomultiprocess\core.py", line 140, in start
return self.aio_process.start()
File "C:\Python\Python37\lib\multiprocessing\process.py", line 112, in start
self._popen = self._Popen(self)
File "C:\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\Python\Python37\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
I am trying to use aiomultiprocess
in my project which the simplified idea of current implementation is
# pip install gym
import gym
import numpy as np
from multiprocessing import Process, Pipe
def worker(master_conn, worker_conn):
master_conn.close()
env = gym.make('Pendulum-v0')
env.reset()
while True:
cmd, data = worker_conn.recv()
if cmd == 'close':
worker_conn.close()
break
elif cmd == 'step':
results = [env.step(data) for _ in range(1000)]
worker_conn.send(results)
class Master(object):
def __init__(self):
self.master_conns, self.worker_conns = zip(*[Pipe() for _ in range(10)])
self.list_process = [Process(target=worker, args=[master_conn, worker_conn], daemon=True)
for master_conn, worker_conn in zip(self.master_conns, self.worker_conns)]
[p.start() for p in self.list_process]
[worker_conn.close() for worker_conn in self.worker_conns]
def go(self, actions):
[master_conn.send(['step', action]) for master_conn, action in zip(self.master_conns, actions)]
results = [master_conn.recv() for master_conn in self.master_conns]
return results
def close(self):
[master_conn.send(['close', None]) for master_conn in self.master_conns]
[p.join() for p in self.list_process]
master = Master()
master.go(np.random.rand(10, 1))
It has a lot of IO communication through Pipes, I am wondering how could I speed it up with aiomultiprocess
I've tried all the methods mentioned in this article, but when I press ctrl+c, there's still a lot of exception information. how to graciously terminate multiprocessing pool?
Hi, I'm facing some issues with process that some functions are initialized before the process start and should be called inside this new process due some dependecies, I found this code from the docs
However, this also requires that any objects or coroutines used must be importable from the fresh child processes. Inner functions, lambdas, or object types defined at runtime, cannot be serialized to these freshly spawned processes.
But how I should execute thoses functions inside this new process with aiomultiprocess? Didn't kinda understood "must be importable from the fresh child processes".
Regards,
Savage.
I'm wondering if this library helps multiple performance of a async http server like aiohttp.
Could you please give a example?
thx.
I think what I'm seeing is that the event loop gets blocked on queue.get() when the subprocess dies while sending data through the result queue, the repro below shows the process blocks the event loop (blocked on connection._recv) and it fails to make any more progress. It seems like the queue.recv in multiprocess.connection does several recv()s and if the subprocess dies in the middle of a send to the pipe this can happen. It seems like this would "normally" be guarded against by an EOF on the receive end of the pipe, but we don't (and can't) close the writer end of the pipe in the parent process because we need to pass it into new subprocesses.
I see that aiomultiprocess attempts to detect dead processes and restart them, is that just best effort/is this expected behavior?
minimal repro:
import asyncio
import os
from aiomultiprocess import pool
async def f():
return ["absc"*1000000]
async def g():
await asyncio.sleep(.05)
os.kill(os.getpid(), 9)
async def hello():
while True:
print('still alive')
await asyncio.sleep(1)
if __name__ == '__main__':
async def main():
asyncio.create_task(hello())
async with pool.Pool(processes=1) as p:
await asyncio.gather(
p.apply(f, ()),
p.apply(f, ()),
p.apply(f, ()),
p.apply(f, ()),
p.apply(f, ()),
p.apply(f, ()),
p.apply(g, ()),
)
asyncio.run(main())
I'm maybe being a little devil here, but is there a way of speeding up more aiomultiprocess with stackless python and the stackless module. I would do some tests, i'm building a crawler using aiomultiprocess which have pretty good performance thanks to @jreese. Is there anyone would already think/implement that?
Hi, @jreese
I have take a testing for example of pool.
Code as below:
import asyncio
from aiohttp import request
from aiomultiprocess import Pool
async def get(url):
async with request("GET", url) as response:
return await response.text("utf-8")
async def main():
urls = ["https://jreese.sh", ...]
async with Pool() as pool:
result = await pool.map(get, urls)
if __name__ == "__main__":
asyncio.run(main())
However, I got the error message:
aio process 12492 failed
Traceback (most recent call last):
File "C:...\Python37\lib\multiprocessing\queues.py", line 107, in get
raise Empty
_queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:...\Python37\lib\aiomultiprocess\core.py", line 133, in run_async
result: R = loop.run_until_complete(unit.target(*unit.args, **unit.kwargs))
File "C:...\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete
return future.result()
File "C:...\Python37\lib\aiomultiprocess\pool.py", line 53, in run
task: PoolTask = self.tx.get_nowait()
File "...\Python37\lib\multiprocessing\queues.py", line 126, in get_nowait
return self.get(False)
File "...\Python37\lib\multiprocessing\queues.py", line 111, in get
self._rlock.release()
OSError: [WinError 6] invalid handler.
I have check the code, I foud that the exception from here:
Line 53 and 54.
Would you please give me some suggestion?
README.md
Executing a coroutine on a child process is as simple as:
async def put(url, params):
TypeError: put() missing 1 required positional argument: 'params'
Hi,
Thanks for creating such an amazing lib! I'm asking this because the built-in multiprocessing module has locks which are useful for handling shared memory safely. However, obtaining the lock is a blocking operation which can be slow in some cases.
Therefore, is it possible to have an async version of multiprocessing lock?
Thanks!
its my code
import aiomultiprocess
import aiohttp
import multiprocessing
import logging
async def run(url):
async with asyncio.Semaphore(500):
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(verify_ssl=False)) as session:
async with session.get(url, timeout=20) as resp:
result = await resp.text()
if 'test' in result:
logging.warning(url)
else:
logging.error(url)
async def main():
url = 'http://www.baidu.com/index.shtml?id='
urls = [url + str(i) for i in range(1000000)]
async with aiomultiprocess.Pool(10)as pool:
await pool.map(run, urls)
if __name__ == '__main__':
multiprocessing.freeze_support()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
error log
Traceback (most recent call last):
File "/Users/hywell/Desktop/getr/get-res.py", line 23, in main
await pool.map(run, urls)
File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 412, in map
tids = [self.queue_work(func, (item,), {}) for item in iterable]
File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 412, in <listcomp>
tids = [self.queue_work(func, (item,), {}) for item in iterable]
File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 367, in queue_work
self.tx_queue.put_nowait((task_id, func, args, kwargs))
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 129, in put_nowait
return self.put(obj, False)
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 83, in put
raise Full
queue.Full
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
main()
File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "/Applications/PyCharm.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "/Applications/PyCharm.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/Users/hywell/Desktop/getr/get-res.py", line 30, in <module>
loop.run_until_complete(main())
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
return future.result()
File "/Users/hywell/Desktop/getr/get-res.py", line 23, in main
await pool.map(run, urls)
File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 323, in __aexit__
self.terminate()
File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 437, in terminate
self.close()
File "/Users/hywell/.local/share/virtualenvs/getr-AgwWyiyi/lib/python3.7/site-packages/aiomultiprocess/core.py", line 432, in close
self.tx_queue.put_nowait(None)
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 129, in put_nowait
return self.put(obj, False)
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 83, in put
raise Full
queue.Full
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.