Giter Site home page Giter Site logo

Comments (5)

noxdafox avatar noxdafox commented on June 3, 2024

Hello,

the chunksize parameter is intended to reduce the IPC overhead when dealing with large datasets. This is particularly useful if the dataset is consisting of lots of small elements (ex: array of float). Sending them one-by-one would become a major bottleneck.

When chunksize is greater than 1, the worker process will process the allotted chunk altogether. In practice, this means it will receive the chunk, process it all and return it back to the main process. Thus minimizing the overhead caused by IPC.

If one of the elements of the chunk causes the worker process to crash or to hang until the timeout occurs, the whole chunk is lost. The reason for this is simple: crashes and timeouts are handled on the main loop which cannot understand what went wrong within the affected worker process.

If you want your logic to pinpoint the offending item within your collection, you can rely on the fact that the returned ProcessMapFuture yields the chunks in the order of submission. Hence, you can re-sumbit the offending chunk(s) with a chunksize of 1 and identify the problematic item in your collection.

from pebble.

Jnelen avatar Jnelen commented on June 3, 2024

Thanks a lot for your reply! Given that information, shouldn't something like this work to ensure the order and length of the output is kept the same?

with ProcessPool(max_workers=num_workers) as pool:

   future = pool.map(
       computeMetric, inputList, timeout=5, chunksize=10
   )
   iterator = future.result()

   while True:
       try:
           result = next(iterator)
           outputList.append(result)
       except StopIteration:
           break
       except TimeoutError:
           timeoutCounter += 1
           outputList += [np.nan] * chunksize
       except Exception:
           errorCounter += 1
           outputList += [np.nan] * chunksize

When I use this in some of my tests, the output length changes, but shouldn't it be the same if a chunk fails as a whole, and I add [np.nan]*chunksize to my final output list? Maybe something in my implementation is wrong?
In my specific case, my computeMetric function is from the spyrmsd package. However I will try to make an easier workable example.

from pebble.

Jnelen avatar Jnelen commented on June 3, 2024

Hi,

Here I have a minimal working example. I just selected random numbers which would time out or produce another error:

from pebble import ProcessPool
from concurrent.futures import TimeoutError
import numpy as np
import time

num_workers=1
timeout=1
chunksize=4

def processFunction(inputNumber):
	if inputNumber == 23:
		time.sleep(5)
	elif inputNumber == 42:
		raise ValueError
	elif inputNumber == 86:
		time.sleep(5)
	elif inputNumber == 98:
		raise ValueError
	return inputNumber

timeoutCounter = 0
errorCounter = 0

with ProcessPool(max_workers=num_workers) as pool:
	outputList = []

	future = pool.map(processFunction, range(100), timeout=timeout, chunksize=chunksize)

	iterator = future.result()

	while True:
		try:
			result = next(iterator)
			outputList.append(result)
		except StopIteration:
			break
		except TimeoutError:
			timeoutCounter += 1
			outputList += [np.nan]*chunksize
		except Exception:
			errorCounter += 1
			outputList += [np.nan]*chunksize
	if timeoutCounter + errorCounter > 0:
		# Calculate total number of np.nan
		failedCompoundsTotal = np.count_nonzero(np.isnan(outputList))	
		print(f"{failedCompoundsTotal} compounds failed in total. {timeoutCounter} chunks (up to {timeoutCounter * chunksize} compounds) timed out and were skipped, {errorCounter} chunks raised an error")

print(len(outputList),outputList)

If you run this code example, you'll find that too many np.nan are inserted when a non-timeout error happens. So it does seem to me that if a single (non-timeout) error happens in a chunk, the rest can somehow still be processed correctly? A timeout error however does make the whole chunk fail, no matter what did or didn't process successfully. It seems like when a regular error occurs, it's better to only 1 np.nan instead:

		except Exception:
			errorCounter += 1
			outputList += [np.nan]

I made these changes in my more specific example, but it seems that somehow still unexpected behaviour is taking place (where the final length has a different length than the input). I'm still investigating this in more detail however. I will try to update if I find anything, otherwise I will just put a check to on the chunksize in the final version of my method.

from pebble.

noxdafox avatar noxdafox commented on June 3, 2024

Normal errors are returned as the worker can intercept them and pass them back. As I mentioned above, timeout and crashes will lead to the loss of the whole batch. With crash I do not mean a Python exception, I mean an actual crash such as a segmentation fault or a OOM.

The following example shows what I've meant.

We pass 10 elements with a chunksize of 2:

  • If element is equal to 1, we raise an exception. In this case the whole batch is processed successfully and element 1 will be replaced with the raised exception.
  • If element is equal to 3, we simulate a timeout. In this case, the whole batch is lost and replaced by a TimeoutError.
  • If element is equal to 7, we simulate a crash similar to a segmentation fault. These errors are rare and typically show up if using faulty C libraries or if we run out of memory. The whole batch is lost again and a ProcessExpired error is raised.
import os
import time
from concurrent.futures import TimeoutError

from pebble import ProcessPool, ProcessExpired


def function(value):
    if value == 1:
        raise RuntimeError('BOOM!')
    if value == 3:
        time.sleep(5)
    if value == 7:
        os._exit(1)  # Simulate a crash such as a segfault

    return value


with ProcessPool(max_workers=1) as pool:
    processed = []

    future = pool.map(function, range(10), timeout=1, chunksize=2)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
            processed.append(result)
        except StopIteration:
            break
        except TimeoutError as error:
            processed.append(error)
        except ProcessExpired as error:
            processed.append(error)
        except Exception as error:
            processed.append(error)

    print(processed)

Output:

[0, RuntimeError('BOOM!'), TimeoutError(), 4, 5, ProcessExpired('Abnormal termination'), 8, 9]

from pebble.

Jnelen avatar Jnelen commented on June 3, 2024

Thanks a lot for the clarification, this makes more sense now!

from pebble.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.