I left for the end the most complex test case.
The code below may seem a bit long...
What we may notice is that I launch a ThreadPool with 10000 tasks in it.
At a certaint time, I raise an exception : either in the callback function, either directly in the function launched by the task, in order to test the pool exit.
The pool is then stopped doing so :
Also, the behaviour is different, wether I have a waitting time in the called function, or not.
File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 42, in __exit__
self.join()
File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 71, in join
self._wait_queue_depletion(timeout)
File "/usr/local/lib/python2.7/site-packages/pebble/pool/base_pool.py", line 84, in _wait_queue_depletion
time.sleep(SLEEP_UNIT)
KeyboardInterrupt
You should copy/paste the code below and run it as it is.
I hope, with these explanations, it will be OK !
Thank you very much again for your help and all my apologies for this poor piece of code... !
PS : When the test is OK, (see below), when I add a waitting time, it seems because I have an other exception in the library : list.remove(x): x not in list
This may cause the interruption of the program so we don't run into an infinite loop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from datetime import date, datetime, timedelta
import time, random
import threading
from threading import current_thread
from pebble import ThreadPool
from pebble import ProcessPool
def init_pool():
global _POOL_RUNNING
print "%s - New created worker - _POOL_RUNNING = %s" % (datetime.now(), _POOL_RUNNING)
return
def pool_stop(cpt):
global _POOL
global _LOCK
global _STOP
print "Call fonction pool_stop - cpt = %s" % cpt
with _LOCK:
if not _STOP:
_STOP = True
#_POOL.close()
print "Call _POOL.stop() ... : _POOL.active = %s" % _POOL.active
_POOL.stop()
print "Call _POOL.join() ... : _POOL.active = %s" % _POOL.active
_POOL.join(0)
else:
print "_POOL has been stoped - cpt = %s" % cpt
print "Exit function pool_stop - cpt = %s" % cpt
return
def work_todo(cpt, a, b, mode = 'callback', max_hold_time = None, test_exit = False, log = False):
global _STOP
global _LOCK
global _POOL
res1 = None
res2 = None
wait_time = None
if not _STOP:
try:
res1 = a + b
res2 = a * b
if max_hold_time:
wait_time = random.randint(1, max_hold_time)
print "%s - %s/%s - Wait %ss..." % (cpt, a, b, wait_time)
time.sleep(wait_time)
print "%s - %s/%s - End of wait." % (cpt, a, b)
if not mode and log:
print "{0}- {1}/{2} - {3} --> {4} secondes -- ({1} + {2}) = {5} ◊◊◊ ({1} * {2}) = {6}".format(
cpt, a, b, current_thread().ident, wait_time, res1, res2)
if test_exit and (not mode or mode == 'err_caller') and b != 0 and b % random.randint(5, 10) == 0:
# Throw exception to test pool exit :
msg = "cpt = %s - exception created in work_done to test pool exit" % cpt
raise RuntimeError(msg)
except Exception as erreur:
print "cpt = %s - ERROR IN work_todo >>>>>>>>>>>>>>>>>>> %s" % (cpt, erreur)
pool_stop(cpt)
else:
print "cpt = %s - POOL has been stopped !" % cpt
return mode, test_exit, max_hold_time, cpt, wait_time, log, a, b, res1, res2, current_thread().ident
def work_done(task):
global _POOL
global _LOCK
global _STOP
cpt = None
try:
mode, test_exit, max_hold_time, cpt, wait_time, log, a, b, res1, res2, thread_id = task.result()
if log:
print "DONE -> {0} - {1} - {2}/{3} - {4} --> {5} secondes -- POOL : {6}/{7} -- ({2} + {3}) = {8} ◊◊◊ ({2} * {3}) = {9}".format(
datetime.now(), cpt, a, b, thread_id, wait_time, _POOL, _POOL.active, res1, res2)
if mode == 'callback':
if test_exit and b != 0 and b % random.randint(5, 10) == 0:
msg = "cpt = %s - exception created in work_done to test pool exit" % cpt
raise RuntimeError(msg)
except Exception as erreur:
print "cpt = %s - ERROR IN work_done >>>>>>>>>>>>>>>>>>> %s" % (cpt, erreur)
pool_stop(cpt)
return
def test_thread_pool(maxi, mode, max_hold_time, test_exit=False):
global _STOP
global _POOL
global _POOL_RUNNING
_STOP = False
trace_fct = "test_thread_pool(maxi=%s, mode=\"%s\", max_hold_time=%s, test_exit=%s) : %s tasks ..." % (maxi, mode, max_hold_time, test_exit, maxi*maxi)
print "\n%s - BEGIN %s" % (datetime.now(), trace_fct)
task = []
try:
_POOL_RUNNING = False
with ThreadPool(max_workers = 5, max_tasks = 5, initializer = init_pool) as _POOL:
cpt = 0
for i in range(0, maxi):
for j in range(0, maxi):
cpt += 1
if _STOP:
print "In test_thread_pool : cpt = %s - stop loading of POOL : POOL has been stopped !" % cpt
break
tache = _POOL.schedule(work_todo, args = (cpt, i, j, mode, max_hold_time, test_exit, False))
if mode in ['callback', 'err_caller']:
tache.add_done_callback(work_done)
task.append(tache)
if _STOP:
break
print "%s - Pool LOADED !!!!" % datetime.now()
_POOL_RUNNING = True
except Exception as erreur:
print " ERROR IN test_thread_pool >>>>>>>>>>>>>>>>>>> ", erreur if erreur.message else type(erreur)
print "\n%s - END %s" % (datetime.now(), trace_fct)
return
# GLOBALS :
_STOP = False
_LOCK = threading.Semaphore()
_POOL = None
_POOL_RUNNING = False
# ------------- #
# Fonction main #
# ------------- #
def main(argv):
maxi = 10
# OK :
#test_thread_pool(maxi=5, mode='callback', max_hold_time=0, test_exit=True)
# NOK :
test_thread_pool(maxi=100, mode='callback', max_hold_time=0, test_exit=True)
# OK :
#test_thread_pool(maxi=100, mode='callback', max_hold_time=5, test_exit=True)
# NOK :
#test_thread_pool(maxi=10, mode='err_caller', max_hold_time=0, test_exit=True)
# NOK :
test_thread_pool(maxi=10, mode='', max_hold_time=0, test_exit=True)
# OK :
#test_thread_pool(maxi=10, mode='', max_hold_time=0, test_exit=True)
return
if __name__ == "__main__":
main(sys.argv[1:])