Giter Site home page Giter Site logo

pebble's Introduction

Pebble

Pebble provides a neat API to manage threads and processes within an application.

Source

https://github.com/noxdafox/pebble

Documentation

https://pebble.readthedocs.io

Download

https://pypi.org/project/Pebble/

Build Status Documentation Status PyPI - Downloads

Examples

Run a job in a separate thread and wait for its results.

from pebble import concurrent

@concurrent.thread
def function(foo, bar=0):
    return foo + bar

future = function(1, bar=2)

result = future.result()  # blocks until results are ready

Same code with AsyncIO support.

import asyncio

from pebble import asynchronous

@asynchronous.thread
def function(foo, bar=0):
    return foo + bar

async def asynchronous_function():
    result = await function(1, bar=2)  # blocks until results are ready
    print(result)

asyncio.run(asynchronous_function())

Run a function with a timeout of ten seconds and deal with errors.

from pebble import concurrent
from concurrent.futures import TimeoutError

@concurrent.process(timeout=10)
def function(foo, bar=0):
    return foo + bar

future = function(1, bar=2)

try:
    result = future.result()  # blocks until results are ready
except TimeoutError as error:
    print("Function took longer than %d seconds" % error.args[1])
except Exception as error:
    print("Function raised %s" % error)
    print(error.traceback)  # traceback of the function

Pools support workers restart, timeout for long running tasks and more.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

TIMEOUT_SECONDS = 3

def function(foo, bar=0):
    return foo + bar

def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function

with ProcessPool(max_workers=5, max_tasks=10) as pool:
    for index in range(0, 10):
        future = pool.schedule(function, index, bar=1, timeout=TIMEOUT_SECONDS)
        future.add_done_callback(task_done)

pebble's People

Contributors

alanjds avatar ampolloreno avatar art049 avatar cbrnr avatar germn avatar gistbatch avatar marxin avatar matoro avatar noxdafox avatar penguinpee avatar sam-harding avatar villind avatar wimglenn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pebble's Issues

EOFError in message_manager_loop

My tool uses both a thread pool and pebble. At some point, when I increase the thread count for the pool and run the tool I get:

Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 171, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 200, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 321, in receive
    return self.pool_channel.recv()
  File "/home/pedro/pch/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 60, in recv
    return self.reader.recv()
EOFError

Any ideas what could be going on?

PS: Sorry to bug you with all these issues, but lately I've not been lucky with pebble / threads.

ProcessPool map usage

def task_done(future):
    try:
        print(future.result())
    except TimeoutError as error:
        logging.error("Function took longer than {} seconds".format(error.args[1]))

with ProcessPool(max_workers=args.workers, initializer=proc_init, initargs=(fail_writer,sw_writer,hw_writer,lks,swfh,hwfh,failfh,compile_re,uname,pwd,r_hash)) as p:
                        future = p.schedule(getAcrDeviceData, args=[devices[0]], timeout=20)
                        future.add_done_callback(task_done)

The above works as expected i.e I am able to get the "Function took longer than 20 seconds" but if I use map instead (devices is the iterable), the future returned has the timeout exception but in the task_done the except TimeoutError block does not execute. Am I using map correctly?

with ProcessPool(max_workers=args.workers, initializer=proc_init, initargs=(fail_writer,sw_writer,hw_writer,lks,swfh,hwfh,failfh,compile_re,uname,pwd,r_hash)) as p:
                        future = p.map(getAcrDeviceData, devices, timeout=20)
                        future.add_done_callback(task_done)

Error upgrading Pebble to 4.4.1 on Windows

Hi,

I'm currently running Pebble 4.4.0 and to get the updated pebble I typed "pip install pebble --upgrade" into a cmd.exe with admin privileges. I got an error, which I have written below.

C:\Windows>pip install pebble --upgrade
Collecting pebble
  Using cached https://files.pythonhosted.org/packages/5e/a3/16f98c868854a6916d68bb6ea02edcf7c91021ad41892862f1307c6831df/Pebble-4.4.1.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "C:\Users\jjsag\AppData\Local\Temp\pip-build-cgvge9xr\pebble\setup.py", line 39, in <module>
        version="{}".format(package_version()),
      File "C:\Users\jjsag\AppData\Local\Temp\pip-build-cgvge9xr\pebble\setup.py", line 14, in package_version
        version = read_version(version_path)
      File "C:\Users\jjsag\AppData\Local\Temp\pip-build-cgvge9xr\pebble\setup.py", line 22, in read_version
        return subprocess.check_output(('git', 'describe')).rstrip().decode()
      File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 316, in check_output
        **kwargs).stdout
      File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 383, in run
        with Popen(*popenargs, **kwargs) as process:
      File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 676, in __init__
        restore_signals, start_new_session)
      File "c:\users\jjsag\appdata\local\programs\thonny\lib\subprocess.py", line 955, in _execute_child
        startupinfo)
    FileNotFoundError: [WinError 2] The system cannot find the file specified

    ----------------------------------------`

I see thonny is in the stack near the bottom, it is my IDE I use to help me with my python, but it has had no issues with any other modules before.

Thanks for reading this, and any suggestions for help will be appreciated. 

Faster response with a lower value of SLEEP_UNIT

I am working a test-case reduction tool (https://github.com/marxin/cvise) where I create ProcessPools and I commonly cancel process futures. I noticed that decreasing SLEEP_UNIT from 0.1 to 0.01 can speed up my workload:

From:

...
00:00:52 INFO (98.7%, 2934 bytes)
00:00:52 INFO (98.7%, 2917 bytes)
00:00:53 INFO (98.7%, 2899 bytes)
00:00:53 INFO (98.7%, 2870 bytes)
00:00:53 INFO (98.7%, 2833 bytes)
00:00:53 INFO (98.8%, 2744 bytes)
00:00:53 INFO ===< LinesPass::1 >===

To:

...
00:00:45 INFO (98.7%, 2870 bytes)
00:00:45 INFO (98.7%, 2833 bytes)
00:00:45 INFO (98.8%, 2744 bytes)
00:00:45 INFO ===< LinesPass::1 >===

Would it be possible to decrease the unit or have a parameter that can a library consumer control?
Thanks for working on the library!

Using Multiprocessing Pipe

Hello,

Thanks for the module. I am trying to make use of the Pipe connection object to send data between two processes, but it seems when using the Process, I am not able to send the connection Pipe as an arg.

So please how can I do this using this module?

Regards

Limiting memory usage for pool process

I'm using pebble process pool to run python code which I don't fully trust / control. This code is parsing HTML documents and for some specific HTML documents it segfaults, enters and endless loop or consumes a lot of memory.

Pebble allows me to protect my main process from segfaults, and by setting a timeout to the parsing process I'm also protected against endless loops.

What is missing for me now is the case where the parser consumes a lot of memory. My ideal scenario would be one where I could tell pebble to kill the process if its memory usage exceeds X number of MB.

There seem to be ways of doing this in regular processes (not related with pebble):

But before implementing this into my code I was wondering if @noxdafox had experience doing something like this, ideas, tips, etc. and if you would be interested in me implementing this idea in pebble instead of my code / pebble subclass.

Exception during interpreter shutdown

This issue is something I found while running unittests in my build environment. It seems like an exception during interpreter shutdown:

Exception in thread Thread-6:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/ubuntu/virtualenvs/venv-2.7.3/local/lib/python2.7/site-packages/pebble/pool/process.py", line 170, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/home/ubuntu/virtualenvs/venv-2.7.3/local/lib/python2.7/site-packages/pebble/pool/process.py", line 199, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/home/ubuntu/virtualenvs/venv-2.7.3/local/lib/python2.7/site-packages/pebble/pool/process.py", line 322, in receive
    return NoMessage()
TypeError: 'NoneType' object is not callable

Not sure if this is related to a) me not calling stop(), terminate(), etc. on the pool or a real bug in the pebble code. Thoughts?

python setup clean modifies the sources

pebble has a rather unusual setup.py, which modifes the sources during the build, even when calling setup.py clean. Seen when packaging pebble. Maybe make this more robust, such that things are not modified, or not appended multiple times? My current workaround is not to call write_version at all.

--- python-pebble-4.5.1.orig/setup.py
+++ python-pebble-4.5.1/setup.py
@@ -17,7 +17,7 @@ def package_version():

 version = read_version(version_path)
 write_version(version_path, version)
  • write_version(init_path, version, mode='a')
  • #write_version(init_path, version, mode='a')

    return version

Using pebble with inheritance, unexpected behaviour in concurrence decorator

I was trying to call external non-trusted subprocesses with the help of pebble. I was modelling it with children classes having two methods with the @Concurrent decorator (one in each child method that each one call a different subprocess with subprocess.run or http request, due a children uses a local library and the other one uses a api library). The problem is that the script is calling the method from the non-expected children (after reviewing twice the spelling).
I have reduced the code to a simple version in order to reproduce the problem.
test_peeble_inheritance.zip

If I call the attached script with the "-f apple -a eat" parameters I could get a stdout similar to this one (the original code is not really using prints and it is calling external processes) :
Fruit Init
Apple Init
Cherry Eat

Package version

It would be convenient if pebble included a version string, preferrably as pebble.__version__. Would this be possible? If yes, I could make a PR.

Please document one user error pattern

It took me more than hour to realize what's wrong:

#!/usr/bin/env python3

from pebble import ProcessPool

class Tester:
    def __init__(self):
        self.futures = []

    def run_one(self, v):
        time.sleep(1)

    def run(self):
        while True:
            print('new ProcessPool')
            with ProcessPool(max_workers=16) as pool:
                f = pool.schedule(self.run_one, [1])
                self.futures.append(f)
                f.result()

Tester().run()
$ ./sample.py
new ProcessPool
Traceback (most recent call last):
  File "/tmp/pe.py", line 20, in <module>
    Tester().run()
  File "/tmp/pe.py", line 18, in run
    f.result()
  File "/usr/lib64/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/usr/lib64/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 204, in schedule
    self.worker_manager.dispatch(task)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 335, in dispatch
    raise error
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 333, in dispatch
    self.pool_channel.send(WorkerTask(task.id, task.payload))
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 66, in send
    return self.writer.send(obj)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib64/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object

The problem is that pickle is called for my Tester which contains a reference to pebble.ProcessFuture.

Tasks are still run after being cancelled

Firstly, thank you for such a great library! I'm having great success integrating this, but have come up against one problem: if you cancel a future that is not yet running via future.cancel(), it still gets run. 2 things stand out as odd, though:

  1. The task_done callback is called when the future is cancelled, as expected. This means it is not called again when the task actually finishes running.
  2. The task is killed after a short time, meaning it does not run to completion if it takes longer than a couple of seconds.

Note that I am running this on Python 2.7, using the backported concurrent.futures library (https://pypi.org/project/futures/).

Here is a minimal reproduction case which I have been using to test:

import os
import time

from pebble import ProcessPool
from concurrent.futures import TimeoutError, CancelledError

def function():
    print '[{}] Sleeping'.format(os.getpid())
    time.sleep(5)
    print '[{}] Done'.format(os.getpid())


def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except CancelledError:
        print('Cancelled')
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function
    else:
        print('Function returned: {}'.format(result))

pool = ProcessPool(max_workers=1)
futures = []

for i in range(0, 10):
    future = pool.schedule(function)
    future.add_done_callback(task_done)
    futures.append(future)

time.sleep(2)

for future in futures:
    if not future.running() and not future.done():
        future.cancel()

pool.close()
pool.join()

The output is as follows:

(zapier)$ python demo.py
[1629] Sleeping
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
Cancelled
[1629] Done
Function returned: None
[1629] Sleeping
[1650] Sleeping
[1651] Sleeping
[1652] Sleeping
[1655] Sleeping
[1656] Sleeping
[1657] Sleeping
[1658] Sleeping
[1659] Sleeping

As you can see from the code, we're submitting 10 jobs to the pool, which has only a single worker. Each job waits for 5 seconds before returning. After 2 seconds, we cancel all the jobs which are not running or finished (of which there are 9, as the first job is still sleeping). At this point, all 9 task_done callbacks are fired as expected.

Then, once the first job finishes, we'd expect the program to exit as pool.join() should return. Instead, the 9 jobs that were previously cancelled each start to run, one after the other. The timestamps aren't shown in my output above, but this happens very fast—so the jobs are being killed almost immediately after starting. You can see by the incrementing pid that the worker process is indeed being killed in each case.


Is there anything we can do here to work around this, or potentially fix the issue? I'm still quite new to the concurrent.futures library, so I may well be doing something wrong!

Thank you again for the great work here!

Edit: I've just confirmed that this happens on Python 3 as well, using the standard library concurrent.futures module.

Corrupted Queue in Python's multiprocessing Pool implementation

Python's multiprocessing pool has various limitations, one I tried to solve with my wrapper code is the timeout of worker processes. I implemented that in a rather ugly way, when the timeout is reached I os.kill the process. The multiprocessing pool implementation does spawn a new worker process and 99.99% of the time everything works well.

0.01% of the time there are some strange issues and the whole pool stops working, which I believe is because of this issue which is documented in the python docs:

Warning If a process is killed using Process.terminate() or os.kill() while it is trying to use a Queue, then the data in the queue is likely to become corrupted. This may cause any other process to get an exception when it tries to use the queue later on.

While looking for different multiprocessing pool implementations I found yours, which does implement timeouts and as far as I could read from process.py#L360-L366 there is code to avoid killing a process when the queue is locked. Am I reading that part of the code correctly?

Which other issues from python's multiprocessing pool did you fix in your implementation?

Does your pool implementation have any known issues?

pebble.ProcessFuture still running after TimeoutError is raised

I can't cancel a running pebble.ProcessFuture after a timeout is reached:

#!/usr/bin/env python3

from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED

import os
import subprocess
import time

def run():
    subprocess.check_output('md5sum /dev/random', shell=True)

with ProcessPool(max_workers=4) as pool:
    print('new ProcessPool')
    futures = []
    for i in range(4):
        futures.append(pool.schedule(run, timeout=1))
    print('md5sum started')
    time.sleep(3)
    for f in futures:
        print(f)
        f.cancel()
        print('Cancel: %s' % str(f))
    time.sleep(3)
    print('After cancelation:')
    for f in futures:
        print(f)
$ ./pe.py 
new ProcessPool
md5sum started
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
Cancel: <ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>
After cancelation:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c130 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c1f0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fc929d2c2b0 state=finished raised TimeoutError>

and

$ ps ax | grep md5
19160 pts/1    R      0:18 md5sum /dev/random
19161 pts/1    R      0:18 md5sum /dev/random
19162 pts/1    R      0:18 md5sum /dev/random
19163 pts/1    R      0:19 md5sum /dev/random

ProcessPool weird behavior with random generators?

Hi all, I encountered what seems to me a weird behavior using ProcessPool.
I need to run some experiments in parallel which use random sampled data every time at the beginning, I observed that the sampled data begin always in the same way as if you were to give a fixed seed.
Here a toy example to reproduce the behavior:

from numpy.random import uniform
import pebble


def power2(c):
    x = uniform()
    return x**2, c**2


if __name__ == '__main__':
    values = [2, 4, 6, 8]
    with pebble.ProcessPool() as p:
        res = p.map(power2, values)
    print(list(res.result()))

which prints:

[(0.1495687030442891, 4), (0.1495687030442891, 16), (0.1495687030442891, 36), (0.1495687030442891, 64)]

As you can notice the first number is always the same...
using ThreadPool instead of ProcessPool this behavior does not show up
is this a bug or what am I missing?
thx for your help

Daemonic processes are not allowed to have children

Hi,

While working with your great library I have faced this problem -- one
of my process that was scheduled using the pool raised this exception:

Daemonic processes are not allowed to have children

Do you know if there is a fix for this?

Feature Request: add `pebble.ProcessFuture.stop`

I would see it handy to be able to selectively stop a ProcessFuture.
Right now, one can only stop all of them with pool.stop. Note that I speak about processes where stop means abruptly.

Thanks!

Pre-Fork ProcessPool?

I'm having issues using Pebble's ProcessPool with gRPC (described here grpc/grpc#18342) and a gRPC contributor suggested that it may be due to the processes in ProcessPool not being pre-forked (his comment is here grpc/grpc#18342 (comment)). Any thoughts on this and if pre-forking can be added as an option? gRPC requires any subprocesses to be forked before the gRPC server starts.

Question on RuntimeError

Hi there,

I've been using pebble for a while, mainly for its timeout capabilities compared to plain concurrent.futures.

Lately I've run into situations where the following runtime error is raised:

File "/infinite/venus/src/task_scheduler/process_scheduler.py", line 231, in _schedule
    return self._executor.schedule(start_job, kwargs={"job": job}, timeout=self._timeout)
  
File "/usr/local/lib/python3.7/site-packages/pebble/pool/process.py", line 85, in schedule
    self._check_pool_state()

File "/usr/local/lib/python3.7/site-packages/pebble/pool/base_pool.py", line 94, in _check_pool_state
    raise RuntimeError('Unexpected error within the Pool')

RuntimeError: Unexpected error within the Pool

I could not find a consistent scenario to trigger it. Sometimes it happens after 1700 jobs have already been scheduled (all of the same type), sometimes everything goes fine even with more than 32000 jobs scheduled.

I'm a bit blind on what could go wrong, I'd appreciate some pointers as to where I should look for... What are the situations that are likely to trigger this ?

Thanks :)

_function_handler is not started

Hi,
I use process, by decorator:
@concurrent.process(timeout=60*60)
but sometimes, function in not started, while process is active.

I added some prints to code, and it stuck in

def _function_handler(function, args, kwargs, pipe):
    """Runs the actual function in separate process and returns its result."""
    print(some_print)

so when task is stuck, print(some_print) is not printed

For my app I can avoid this by add another thread to check stuck tasks, but I want to understand, why its going.

Thanks, and sorry for bad English

TypeError: an integer is required (got type NoneType) in pebble/pool/process.py", line 178, in message_manager_loop

First, I would like to really thank for the library. I need to create a process pool where I need capability to immediately terminate running tasks. I can't understand why the official concurrent.futures lacks the ability.

However, I see various exceptions when I do pool.stop():
https://github.com/marxin/creduce/blob/threadpool/creduce/utils/testing.py#L317-L319

00:00:01 INFO ===< ClangBinarySearchPass::replace-function-def-with-decl >===
Exception in thread Thread-18:
Traceback (most recent call last):
  File "/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/lib64/python3.8/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 178, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 210, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 342, in receive
    return self.pool_channel.recv()
  File "/usr/lib/python3.8/site-packages/pebble/pool/channel.py", line 63, in recv
    return self.reader.recv()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 250, in recv
    buf = self._recv_bytes()
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 421, in _recv_bytes
    return self._recv(size)
  File "/usr/lib64/python3.8/multiprocessing/connection.py", line 379, in _recv
    chunk = read(handle, remaining)
TypeError: an integer is required (got type NoneType)
...
Traceback (most recent call last):
  File "/tmp/bin/creduce/usr/local/bin/creduce", line 204, in <module>
    reducer.reduce(pass_group, skip_initial=args.skip_initial_passes)
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 118, in reduce
    self._run_additional_passes(pass_group["first"])
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/creduce.py", line 145, in _run_additional_passes
    self.test_manager.run_pass(p)
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 384, in run_pass
    parallel_tests = self.run_parallel_tests()
  File "/tmp/bin/creduce/usr/local/bin/../share/creduce/utils/testing.py", line 319, in run_parallel_tests
    pool.join()
  File "/usr/lib/python3.8/site-packages/pebble/pool/base_pool.py", line 77, in join
    self._stop_pool()
  File "/usr/lib/python3.8/site-packages/pebble/pool/process.py", line 75, in _stop_pool
    loop.join()
  File "/usr/lib64/python3.8/threading.py", line 1011, in join
    self._wait_for_tstate_lock()
  File "/usr/lib64/python3.8/threading.py", line 1027, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

Can you please help me what can I do wrong?

MemoryError channel recv

While running some scans using w3af, which uses pebble for parsing HTML documents I get MemoryError exceptions:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 383, in worker_process
    for task in worker_get_next_task(channel, params.max_tasks):
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 398, in worker_get_next_task
    yield fetch_task(channel)
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 404, in fetch_task
    return task_transaction(channel)
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 413, in task_transaction
    task = channel.recv()
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 90, in recv
    return self.reader.recv()
MemoryError

Any ideas what these could be? My research leads me to believe that this could be because of large HTML files being sent through the Pipe, but I was unable to confirm it.

Also, memory usage shouldn't be an issue, I have 64G of RAM and 64G of swap.

Odd Error When Collecting Results from ProcessPool

I was testing timeouts on a Win10 machine (pebble version 4.3.9, Python 3.6.6 from conda) trying to collect and print all of the answers that didn't timeout.

from concurrent.futures import TimeoutError
from pebble import ProcessPool

def fibonacci(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fibonacci(n - 1) + fibonacci(n - 2)

def main():
    with ProcessPool() as pool:
        future = pool.map(fibonacci, range(40), timeout=10)
        iterator = future.result()

        all = []
        while True:
            try:
                all.append(next(iterator))
            except StopIteration:
                break
            except TimeoutError as e:
                print(f'function took longer than {e.args[1]} seconds')

        print(all)

It seems to be working but it gives me a very dire error:

[0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181, 6765, 10946, 17711, 28657, 46368, 75025, 121393, 196418, 317811, 514229, 832040, 1346269, 2178309, 3524578]
RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\anaconda3\lib\multiprocessing\spawn.py", line 99, in spawn_main
    new_handle = reduction.steal_handle(parent_pid, pipe_handle)
  File "C:\anaconda3\lib\multiprocessing\reduction.py", line 87, in steal_handle
    _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
PermissionError: [WinError 5] Access is denied

Interestingly, part of the error happens even when range(40) is replaced by range(10) but only the RuntimeError remains and weirdly there is no traceback given.

RuntimeError: I/O operations still in flight while destroying Overlapped object, the process may crash

Latest wheel (4.3.4) is not compatible with Windows?

Hi,

Thanks for a nice package!

However, ran in to the following while trying to install pebble on python 3.6 on a windows machine today (I have installed previous releases without any trouble):

Directly through pip + pypi:

Collecting pebble Could not find a version that satisfies the requirement pebble (from versions: ) No matching distribution found for pebble

And when downloading the wheel manually and trying pip install *.whl:

Pebble-4.3.4-py2.py3-none-any.whl is not a supported wheel on this platform.

OSX TypeError: 'NoneType' object is not callable

I keep getting this running from OSX
Exception in thread task_scheduler:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner
self.run()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run
self.__target(_self.__args, *_self.__kwargs)
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/pool.py", line 82, in task_scheduler_loop
pool_manager.schedule(task)
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/pool.py", line 141, in schedule
self.worker_manager.dispatch(task)
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/pool.py", line 252, in dispatch
self.pool_channel.send(NewTask(task.number, task._metadata))
File "/xxx/migrate/lib/python2.7/site-packages/pebble/process/channel.py", line 55, in send
return self.writer.send(obj)
TypeError: 'NoneType' object is not callable

trying to run:
pool = process.Pool()
for q in query:
job = pool.schedule( background, args=[source, destination, q ] )

Installing Pebble from pypi for Python 2.7 is broken

Installing Pebble via pip for Python 2.7 seems to be misconfigured.

To wit:

(venv) c:\Users\scole\Documents\ingress>pip install Pebble
Collecting Pebble
  Downloading Pebble-4.1.0-py2.py3-none-any.whl (49kB)
Installing collected packages: Pebble
Successfully installed Pebble-4.1.0

(venv) c:\Users\scole\Documents\ingress>python
Python 2.7.12 (v2.7.12:d33e0cf91556, Jun 27 2016, 15:19:22) [MSC v.1500 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>> import pebble
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "c:\Users\scole\Documents\ingress\venv\lib\site-packages\pebble\__init__.py", line 10, in <module>
    from pebble.pool.thread import ThreadPool
  File "c:\Users\scole\Documents\ingress\venv\lib\site-packages\pebble\pool\thread.py", line 21, in <module>
    from pebble.pool.base_pool import BasePool, run_initializer
  File "c:\Users\scole\Documents\ingress\venv\lib\site-packages\pebble\pool\base_pool.py", line 21, in <module>
    from concurrent.futures import Future, TimeoutError
ImportError: No module named concurrent.futures
>>> 

Now, I certainly see the stuff in the top level of this repo that tries to install future; it just hasn't happened for some reason. (I get this same error on both windows installs and linux installs.)

I'll take a look myself, but I haven't ever used setuptools before personally, so there's a bit of a learning curve there.

Can it compatible with windows platform?

The test code:

from pebble import ProcessPool
from concurrent.futures import TimeoutError


def fibonacci(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fibonacci(n - 1) + fibonacci(n - 2)


with ProcessPool() as pool:
    future = pool.map(fibonacci, range(50), timeout=10)

    try:
        for n in future.result():
            print(n)
    except TimeoutError:
        print("TimeoutError: aborting remaining computations")
        future.cancel()

I am using pebble==4.3.10 on windows 10 with Python 3.6.8, and the test code above raised error:

......
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
    is not going to be frozen to produce an executable.''')
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

FileNotFoundError During Installation

Version Info

Package Versions:
Pebble version: v4.4.1

Environment Versions:
Docker Image: python:3.6-alpine
Gitlab Runner Version: gitlab-runner 11.10.1

Expected Behavior

Pebble installs on docker image has occurred in 4.4.0.

Actual Behavior

Collecting pebble==4.4.1
  Downloading https://files.pythonhosted.org/packages/5e/a3/16f98c868854a6916d68bb6ea02edcf7c91021ad41892862f1307c6831df/Pebble-4.4.1.tar.gz
    ERROR: Command errored out with exit status 1:
     command: /usr/local/bin/python -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-xx5yp0kw/pebble/setup.py'"'"'; __file__='"'"'/tmp/pip-install-xx5yp0kw/pebble/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-install-xx5yp0kw/pebble/pip-egg-info
         cwd: /tmp/pip-install-xx5yp0kw/pebble/
    Complete output (17 lines):
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/tmp/pip-install-xx5yp0kw/pebble/setup.py", line 39, in <module>
        version="{}".format(package_version()),
      File "/tmp/pip-install-xx5yp0kw/pebble/setup.py", line 14, in package_version
        version = read_version(version_path)
      File "/tmp/pip-install-xx5yp0kw/pebble/setup.py", line 22, in read_version
        return subprocess.check_output(('git', 'describe')).rstrip().decode()
      File "/usr/local/lib/python3.6/subprocess.py", line 356, in check_output
        **kwargs).stdout
      File "/usr/local/lib/python3.6/subprocess.py", line 423, in run
        with Popen(*popenargs, **kwargs) as process:
      File "/usr/local/lib/python3.6/subprocess.py", line 729, in __init__
        restore_signals, start_new_session)
      File "/usr/local/lib/python3.6/subprocess.py", line 1364, in _execute_child
        raise child_exception_type(errno_num, err_msg, err_filename)
    FileNotFoundError: [Errno 2] No such file or directory: 'git': 'git'
    ----------------------------------------
ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.

Steps to Reproduce

(some may be extraneous)

  1. Pull & Run python:3.6-alpine docker image
    docker pull python:3.6-alpine && docker run -it python:3.6-alpine sh
  2. Attempt to install pebble
    pip install pebble==4.4.1

I got error when use pool

my code

def task_done(iterator):
    while True:
        try:
            result = next(iterator)
        except StopIteration:
            break
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])
        except ProcessExpired as error:
            print("%s. Exit code: %d" % (error, error.exitcode))
        except Exception as error:
            print("function raised %s" % error)
            traceback.print_exc()  # Python's traceback of remote process


with ProcessPool(max_workers=PROCESS_POOL_SIZE, max_tasks=10) as pool:
       future = pool.map(crawl_func, summoners, timeout=0)
       iterator = future.result()
       task_done(iterator)

errors

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "c:\python27\Lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "c:\python27\Lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "c:\python27\Lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "c:\python27\Lib\pickle.py", line 886, in load_eof
    raise EOFError
EOFError

python2.7 on win10 please help me

running on a mac causes exception using python 2.7

I am running on a mac and I get an exception running this code

    jobs = []
    with ProcessPool( max_workers = workers ) as pool:
        # background is the task that runs and the parameter to it follow
        for q in query:
            job = pool.schedule( background, args=( q, recursion, action, update ) ) 
            job.add_done_callback( finished )
            jobs.append(job)
File "/Users/Dropbox/Work/adops/migrate/lib/python2.7/site-packages/pebble/pool/channel.py", line 44, in unix_poll
   return bool(select([self.reader], [], [], timeout)[0])
error: (9, 'Bad file descriptor')

EOFError with ProcessPool

from pebble import ProcessPool
from concurrent.futures import TimeoutError
import time

if __name__ == '__main__':
    with ProcessPool() as pool:
        future = pool.map(time.sleep, [1, 25, 62, 7], timeout=8)
        iterator = future.result()

        while True:
            try:
                next(iterator)
            except StopIteration:
                break
            except TimeoutError as error:
                print 'task took too long, {}, {}'.format(error.args[1], error)

        # time.sleep(2)  # <-- Fixes the crash

Yields:

task took too long, 8, ('Task timeout', 8)
task took too long, 8, ('Task timeout', 8)
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 886, in load_eof
    raise EOFError
EOFError

Env:
Windows 7
Python: 2.7.15
Pebble: 4.3.9

You can see the commented time.sleep(2) at the end. Interestingly, this resolves this issue. I suspect maybe something is happening too quickly before all the processes are closed.

I have a Ubuntu 18.04.1 LTS machine completely up to date that does not have the same issue. So it seems windows related.

ProcessPool hangs on join()

I'm using Pebble 4.41 with python3.7 on mac Mojave.
A project of mine hangs when using ProcessPool.join().

I tried the test case in the repo test_process_pool_close_stopped, and it hangs as well.

Error using Map Example

OS: Windows 10 X64
Python Version: 3.5.4 from Anaconda

Example:

from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired

def function(n):
    return n

with ProcessPool() as pool:
    future = pool.map(function, range(100), timeout=10)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
        except StopIteration:
            break
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])
        except ProcessExpired as error:
            print("%s. Exit code: %d" % (error, error.exitcode))
        except Exception as error:
            print("function raised %s" % error)
            print(error.traceback)  # Python's traceback of remote process

Error: AttributeError: Can't get attribute 'function' on <module '__main__' (built-in)>

Stack Trace for one of the processes:

Process Process-3:
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 252, in _bootstrap
    self.run()
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 383, in worker_process
    for task in worker_get_next_task(channel, params.max_tasks):
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 252, in _bootstrap
    self.run()
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 398, in worker_get_next_task
    yield fetch_task(channel)
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 404, in fetch_task
    return task_transaction(channel)
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 383, in worker_process
    for task in worker_get_next_task(channel, params.max_tasks):
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 413, in task_transaction
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 398, in worker_get_next_task
    yield fetch_task(channel)
    task = channel.recv()
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 404, in fetch_task
    return task_transaction(channel)
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\channel.py", line 90, in recv
    return self.reader.recv()
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\process.py", line 413, in task_transaction
    task = channel.recv()
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\site-packages\pebble\pool\channel.py", line 90, in recv
    return self.reader.recv()
AttributeError: Can't get attribute 'function' on <module '__main__' (built-in)>
  File "C:\Users\Jerem\Anaconda3\envs\airsim\lib\multiprocessing\connection.py", line 251, in recv
    return ForkingPickler.loads(buf.getbuffer())
AttributeError: Can't get attribute 'function' on <module '__main__' (built-in)>

What my end game of using pebble will be (not necessarily from example):
Map a list of arguments to function call using a multiprocessing pool. Have a global timeout. If time runs out, the returns from the function calls (though incomplete) are still valuable and should be saved and further processed in the main process.

Thanks for your help and this great package.

SystemError: NULL result without error in PyObject_Call

Returning a lot of data from a subprocess will raise an exception:

from pebble import ProcessPool
from concurrent.futures import TimeoutError


def function(foo):
    return 'A' * (10 ** foo)


def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function
    else:
        print(len(result))


pool = ProcessPool(max_workers=5, max_tasks=10)

for i in xrange(11):
    future = pool.schedule(function, args=[i], timeout=20)
    future.add_done_callback(task_done)

pool.close()
pool.join()
[eth:~/tools/w3af] [w3af] develop* ± python testp.py 
1
10
100
1000
10000
100000
1000000
10000000
100000000
1000000000
Process Process-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/process.py", line 387, in worker_process
    send_result(channel, Result(task.id, result))
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/common.py", line 163, in send_result
    pipe.send(data)
  File "/home/eth/tools/virtualenvs/w3af/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 97, in unix_send
    return self.writer.send(obj)
SystemError: NULL result without error in PyObject_Call
Function raised Abnormal termination
No handlers could be found for logger "concurrent.futures"
[eth:~/tools/w3af] [w3af] develop* 3s ± 

Note that the exception is NOT caught by except Exception as error: in task_done, which makes it impossible to handle.

This issue seems to be known / common:

And it also seems to be an anti-pattern to send tons of data to / from the subprocess. My goal here is to report this in order to get the conversation started, ultimately I would like this exception to be handled by pebble in such a way that I can then handle it in my code.

This issue might be related with #28

ValueError: filedescriptor out of range in select()

I run a pool.map on 300 millions entries. Running locally it works, on the linux cluster it fails with this exception

Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/pebble/pool/process.py", line 385, in worker_process
    for task in worker_get_next_task(channel, params.max_tasks):
  File "/usr/local/lib/python3.6/dist-packages/pebble/pool/process.py", line 400, in worker_get_next_task
    yield fetch_task(channel)
  File "/usr/local/lib/python3.6/dist-packages/pebble/pool/process.py", line 404, in fetch_task
    while channel.poll():
  File "/usr/local/lib/python3.6/dist-packages/pebble/pool/channel.py", line 46, in unix_poll
    return bool(select.select([self.reader], [], [], timeout)[0])
ValueError: filedescriptor out of range in select()

Is it possible to switch out the select.select() in the channel.py with a select.poll()

Thanks for any help pointing out fixing this error

Infinite loop on ThreadPool

Hi Matteo,

I left for the end the most complex test case.

The code below may seem a bit long...

What we may notice is that I launch a ThreadPool with 10000 tasks in it.
At a certaint time, I raise an exception : either in the callback function, either directly in the function launched by the task, in order to test the pool exit.
The pool is then stopped doing so :

_POOL.stop()
_POOL.join()

see pool_stopfunction.

Also, the behaviour is different, wether I have a waitting time in the called function, or not.

One last thing I ommitted to say : the infinite loop occurs in the _wait_queue_depletion function :

  File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 42, in __exit__
    self.join()
  File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 71, in join
    self._wait_queue_depletion(timeout)
  File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 84, in _wait_queue_depletion
    time.sleep(SLEEP_UNIT)
KeyboardInterrupt

You should copy/paste the code below and run it as it is.
I hope, with these explanations, it will be OK !

Thank you very much again for your help and all my apologies for this poor piece of code... !

Regards,

Philippe

PS : When the test is OK, (see below), when I add a waitting time, it seems because I have an other exception in the library : list.remove(x): x not in list
This may cause the interruption of the program so we don't run into an infinite loop.

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from datetime import date, datetime, timedelta
import time, random
import threading
from threading import current_thread
from pebble import ThreadPool
from pebble import ProcessPool

def init_pool():
    global _POOL_RUNNING

    print "%s - New created worker  - _POOL_RUNNING = %s" % (datetime.now(), _POOL_RUNNING)
    return

def pool_stop(cpt):
    global _POOL
    global _LOCK
    global _STOP

    print "Call fonction pool_stop - cpt = %s" % cpt
    with _LOCK:
        if not _STOP:
            _STOP = True
            #_POOL.close()
            print "Call _POOL.stop() ... : _POOL.active = %s" % _POOL.active
            _POOL.stop()
            print "Call _POOL.join() ... : _POOL.active = %s" % _POOL.active
            _POOL.join(0)
        else:
            print "_POOL has been stoped - cpt = %s" % cpt
    print "Exit function pool_stop - cpt = %s" % cpt
    return

def work_todo(cpt, a, b, mode = 'callback', max_hold_time = None, test_exit = False, log = False):
    global _STOP
    global _LOCK
    global _POOL

    res1 = None
    res2 = None
    wait_time = None
    if not _STOP:
        try:
            res1 = a + b
            res2 = a * b
            if max_hold_time:
                wait_time = random.randint(1, max_hold_time)
                print "%s - %s/%s - Wait %ss..." % (cpt, a, b, wait_time)
                time.sleep(wait_time)
                print "%s - %s/%s - End of wait." % (cpt, a, b)
            if not mode and log:
                print "{0}- {1}/{2} - {3} --> {4} secondes -- ({1} + {2}) = {5} ◊◊◊ ({1} * {2}) = {6}".format(
                    cpt, a, b, current_thread().ident, wait_time, res1, res2)

            if test_exit and (not mode or mode == 'err_caller') and b != 0 and b % random.randint(5, 10) == 0:
                # Throw exception to test pool exit :
                msg = "cpt = %s - exception created in work_done to test pool exit" % cpt
                raise RuntimeError(msg)
        except Exception as erreur:
            print "cpt = %s - ERROR IN work_todo >>>>>>>>>>>>>>>>>>> %s" % (cpt, erreur)
            pool_stop(cpt)
    else:
        print "cpt = %s - POOL has been stopped !" % cpt

    return mode, test_exit, max_hold_time, cpt, wait_time, log, a, b, res1, res2, current_thread().ident

def work_done(task):
    global _POOL
    global _LOCK
    global _STOP

    cpt = None
    try:
        mode, test_exit, max_hold_time, cpt, wait_time, log, a, b, res1, res2, thread_id = task.result()
        if log:
            print "DONE -> {0} - {1} - {2}/{3} - {4} --> {5} secondes -- POOL : {6}/{7} -- ({2} + {3}) = {8} ◊◊◊ ({2} * {3}) = {9}".format(
                datetime.now(), cpt, a, b, thread_id, wait_time, _POOL, _POOL.active, res1, res2)
        if mode == 'callback':
            if test_exit and b != 0 and b % random.randint(5, 10) == 0:
                msg = "cpt = %s - exception created in work_done to test pool exit" % cpt
                raise RuntimeError(msg)
    except Exception as erreur:
        print "cpt = %s - ERROR IN work_done >>>>>>>>>>>>>>>>>>> %s" % (cpt, erreur)
        pool_stop(cpt)
    return

def test_thread_pool(maxi, mode, max_hold_time, test_exit=False):
    global _STOP
    global _POOL
    global _POOL_RUNNING

    _STOP = False
    trace_fct =  "test_thread_pool(maxi=%s, mode=\"%s\", max_hold_time=%s, test_exit=%s) : %s tasks ..." % (maxi, mode, max_hold_time, test_exit, maxi*maxi)
    print "\n%s - BEGIN %s" % (datetime.now(), trace_fct)
    task = []
    try:
        _POOL_RUNNING = False
        with ThreadPool(max_workers = 5, max_tasks = 5, initializer = init_pool) as _POOL:
            cpt = 0
            for i in range(0, maxi):
                for j in range(0, maxi):
                    cpt += 1
                    if _STOP:
                        print "In test_thread_pool : cpt = %s - stop loading of POOL : POOL has been stopped !" % cpt
                        break

                    tache = _POOL.schedule(work_todo, args = (cpt, i, j, mode, max_hold_time, test_exit, False))
                    if mode in ['callback', 'err_caller']:
                        tache.add_done_callback(work_done)
                    task.append(tache)
                if _STOP:
                    break
            print "%s - Pool LOADED !!!!" % datetime.now()
            _POOL_RUNNING = True
    except Exception as erreur:
        print " ERROR IN test_thread_pool >>>>>>>>>>>>>>>>>>> ", erreur if erreur.message else type(erreur)
    print "\n%s - END %s" % (datetime.now(), trace_fct)
    return

# GLOBALS :
_STOP = False
_LOCK = threading.Semaphore()
_POOL = None
_POOL_RUNNING = False

# ------------- #
# Fonction main #
# ------------- #
def main(argv):

    maxi = 10
    # OK :
    #test_thread_pool(maxi=5, mode='callback', max_hold_time=0, test_exit=True)
    # NOK :
    test_thread_pool(maxi=100, mode='callback', max_hold_time=0, test_exit=True)
    # OK :
    #test_thread_pool(maxi=100, mode='callback', max_hold_time=5, test_exit=True)
    # NOK :
    #test_thread_pool(maxi=10, mode='err_caller', max_hold_time=0, test_exit=True)
    # NOK :
    test_thread_pool(maxi=10, mode='', max_hold_time=0, test_exit=True)
    # OK :
    #test_thread_pool(maxi=10, mode='', max_hold_time=0, test_exit=True)

    return

if __name__ == "__main__":
    main(sys.argv[1:])

add_done_callback() not passing the original task

Based on the example on README, let say we have this:-

from pebble import ProcessPool
from concurrent.futures import TimeoutError

def execute_task(task):
    # work on task
    ...
    task.status = 'done'
    task.save()

def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function

with ProcessPool(max_workers=5, max_tasks=10) as pool:
    for task in get_tasks():
        future = pool.schedule(execute_task, args=[task], timeout=3)
        future.add_done_callback(task_done)

So in case of time out for example, we may want to set task.status = 'failed but we don't have any reference to task from task_done() function. One workaround I can think of is with closure:-

def task_done(future, task):
    try:
        result = future.result()  # blocks until results are ready
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])
        task.status = 'failed'
        task.save()
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function

with ProcessPool(max_workers=5, max_tasks=10) as pool:
    for task in get_tasks():
        def task_done_wrapper(future):
            task_done(future, task)
        future = pool.schedule(execute_task, args=[task], timeout=3)
        future.add_done_callback(task_done_wrapper)

But is this a viable solution?

ProcessPool issue : error: (9, 'Bad file descriptor')

Hello again.
An other issue with the example : (4.2.1 and moc osx 10.12.5).
I cannot go threw the example on https://pypi.python.org/pypi/Pebble, see code below

def function(foo, bar=0):
    return foo + bar

def task_done(future):
    try:
        result = future.result()  # blocks until results are ready
    except Exception as error:
        print("Function raised %s" % error)
        print(error.traceback)  # traceback of the function
    except TimeoutError as error:
        print("Function took longer than %d seconds" % error.args[1])

with ProcessPool(max_workers=5, max_tasks=10) as pool:
    for i in range(0, 10):
        future = pool.schedule(function, args=[i], timeout=3)
        future.add_done_callback(task_done)

It gives at the end of exectution :

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 801, in __bootstrap_inner
    self.run()
  File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/site-packages/pebble/pool/process.py", line 155, in message_manager_loop
    pool_manager.process_next_message(SLEEP_UNIT)
  File "/usr/local/lib/python2.7/site-packages/pebble/pool/process.py", line 184, in process_next_message
    message = self.worker_manager.receive(timeout)
  File "/usr/local/lib/python2.7/site-packages/pebble/pool/process.py", line 304, in receive
    if self.pool_channel.poll(timeout):
  File "/usr/local/lib/python2.7/site-packages/pebble/pool/channel.py", line 44, in unix_poll
    return bool(select([self.reader], [], [], timeout)[0])
error: (9, 'Bad file descriptor')

Thank you again in advance for youy help ! :-)

Feature request: Generators in processes

User story

As a developer I would like to run a generator in another process, and get the results one by one to be able to process them in the main thread.

The function I'm running generates results which uses a lot of RAM (a list with many objects) and each object of the list can be processed individually. If I wouldn't be using a different process to run the function that generates these results, I would certainly use a generator function.

At the moment the functions being run in the sub-processes can only return one result: the list with all the objects. In an ideal scenario, if this gets implemented, future.result() would return a generator which I could iterate over. Each time a result is produced by the sub-process function, it is pickled, sent to the pipe, unpickled and yield in the generator.

Notes

  • I know I could do this manually somehow: use files to store the objects, use memcached, use an external queue system, etc.
  • If it is already implemented, please let me know where 👍
  • If there is an easier way to do this, please let me know how 👍

Cancel from inside a process pool

Is there a way to cancel using ProcessPool.map from inside the function?

def func(iterable):
  result = iterable * iterable
  if result > 10:
    # cancel pool
  return result

def main(args):
  pool = ProcessPool(max_workers=10)
  future = pool.map(func, [1,2,3,4])
  res = list(future.result())
  # res = [1,4,9]

Thanks

a simple approach, imthread

Installation

pip install imthread

Usage

import imthread
import requests

#the function for processing data
def my_func(data):
    imthread.console_log(output=True)
    data = requests.get("http://httpbin.org/get")
    return data

#sending arguments for asynchronous multi thread processing
processed_data = imthread.start(my_func, repeat=20, max_threads=20)

#printing the synchronised received results
print()
print(f'>> Result: {processed_data}')
imthread.elapsed(output=True)

output

>> Creating Threads 1
>> Creating Threads 2
>> Creating Threads 3
>> Creating Threads 4
>> Creating Threads 5
>> Creating Threads 6
>> Creating Threads 7
>> Creating Threads 8
>> Creating Threads 9
>> Creating Threads 10
>> Creating Threads 11
>> Creating Threads 12
>> Creating Threads 13
>> Creating Threads 14
>> Creating Threads 15
>> Creating Threads 16
>> Creating Threads 17
>> Creating Threads 18
>> Creating Threads 19
>> Creating Threads 20

>> Result: [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]
>> Elapsed time: 0.55 sec

Checkout https://github.com/imneonizer/imthread

pebble process pool kills main process [on linux only]

I'm trying to run a function in a process pool with timing out after 5 seconds (in an async environment).

    with pebble.ProcessPool(1) as pool:
         future = pool.schedule(func, args=[foo], timeout=5)
         async_future = loop.run_in_executor(None, future.result, 5)
         result = await asyncio.wait_for(async_future, 5)

On Windows this works like a charm. But on Linux (I have tried it with Debian and Ubuntu) the main process (the whole application) gets killed – indifferent of the function taking more or less than 5 seconds. There is no output or error message when the process exits. Is this a bug or am I missing something?

  • Pebble Version: 4.5.3
  • Ubuntu 18.04.2 LTS
  • Debian GNU/Linux 9.12

Exceptions at WorkerManager.dispatch level are supressed

Hello!

I ran into an issue caused by a psycopg connection object that cannot be pickled when sending the task payload to the worker. I was eventually able to fix the root cause but the debugging process turned out to be quite tedious because I couldn't see where the actual exception was coming from.

Here's what I was able to find:

  1. process.WorkerManager.dispatch attempts to send the task payload to the worker
    def dispatch(self, task):
        try:
            self.pool_channel.send(WorkerTask(task.id, task.payload))
        except (OSError, EnvironmentError, TypeError) as error:
            raise BrokenProcessPool(error)
  1. The exception is re-raised and encapsulated by BrokenProcessPool (great!)
  2. Since this is happening in the task_scheduler_loop:
    try:
        while context.alive:
            task = task_queue.get()

            if task is not None:
                if task.future.cancelled():
                    task.set_running_or_notify_cancel()
                    task_queue.task_done()
                else:
                    pool_manager.schedule(task)
            else:
                task_queue.task_done()
    except BrokenProcessPool:
        context.state = ERROR

the exception is caught and the state is set to ERROR, but the original message and trace are lost.
4. On the following _check_pool_state call, the state is ERROR and a new RuntimeError is raised, masking the root cause.

Ideally, the cause of the original exception would either be propagated and then included in the RuntimeError or just sent to stderr, logged, etc.

If this is in fact the expected behaviour and it should've been logged, I may have missed a step in the setup part (though I do not recall seeing anything in the docs). Please let me know if that's case, and thanks for the great work you've done with pebble!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.