Giter Site home page Giter Site logo

Queue-based pool about parallel HOT 8 CLOSED

hitriyvalenok avatar hitriyvalenok commented on June 14, 2024
Queue-based pool

from parallel.

Comments (8)

trowski avatar trowski commented on June 14, 2024 1

If you want to limit the number of tasks being submitted to the pool, consider using some of our abstractions for queues, mutexes/semaphores, or concurrency. You can find these in amphp/sync and amphp/pipeline.

For example, you may be able to use a Pipeline to limit the concurrency.

Pipeline::fromIterable($taskIterable) // $taskIterable is an iterable producing Task objects.
    ->concurrent(10) // Limit to 10 simultaneous tasks
    ->map(fn (Task $task) => workerPool()->submit($task))
    ->map(fn (Execution $execution) => $execution->await())
    ->forEach(function ($result): void {
        // Do something with task result in $result.
    });

from parallel.

kelunik avatar kelunik commented on June 14, 2024

What do you mean with wait asynchronously?

from parallel.

hitriyvalenok avatar hitriyvalenok commented on June 14, 2024

I mean I don't want that submit force me to wait for the task completion. I'd prefer to use wait for it.

from parallel.

kelunik avatar kelunik commented on June 14, 2024

Which version of parallel do you use?

from parallel.

hitriyvalenok avatar hitriyvalenok commented on June 14, 2024

2.2

from parallel.

hitriyvalenok avatar hitriyvalenok commented on June 14, 2024

Right now I'm trying to stop the task by throwing an exception from the run method. I want to throw an exception from submit - thanks to it I can catch it without any waits. But I found that only die throws something (ChannelException) from submit; any other exceptions can be caught only in the wait methods.

Ideally, I would start all tasks asynchronously, then call ContextWorkerPool->shutdown(), but submit is spoiling the whole raspberries. Also, it's ok for me to throw something like CancelledException and catch it in submit.

from parallel.

hitriyvalenok avatar hitriyvalenok commented on June 14, 2024

Thanks! But it for some reason starts x2 threads. For example, in this code tasks started two each, although the concurrent value is set to 1. With concurrent=2 4 simultaneous tasks will be started.

use Amp\Parallel\Worker\Execution;
use Amp\Parallel\Worker\Task;
use Amp\Pipeline\Pipeline;
use Bro\SampleTask;
use function Amp\Parallel\Worker\createWorker;

class SampleTask implements Task
{
    public function run( Channel $channel, Cancellation $cancellation ): mixed
    {
        sleep( 2 );
        return true;
    }
}

$tasksToStart = 4;
$concurrent = 1;
$task = new SampleTask();

$tasks = [];
for( $taskNum = 0; $taskNum < $tasksToStart; ++$taskNum ) {
    $tasks[] = $task;
}

Pipeline::fromIterable( $tasks )
        ->concurrent( $concurrent )
        ->map( fn( Task $task ) => createWorker()->submit( $task ) ) // no pools
        ->map( fn( Execution $execution ) => $execution->await() )
        ->forEach( function( $result ): void {
            $currentTime = DateTime::createFromFormat( 'U.u', sprintf( '%.6F', microtime( true ) ) )->format(
                'H-i-s.u'
            );
            echo "$currentTime\n";
        } )
;

Output:

19-34-27.059025
19-34-27.121341
19-34-29.123185
19-34-29.181591

from parallel.

trowski avatar trowski commented on June 14, 2024

Multiple tasks are being executed because you're creating the tasks before awaiting the tasks to be completed. Awaiting is what will apply back-pressure to the pipeline (or rather the underlying concurrent iterator) and cause it to wait to consume more values from the source iterator.

Changing the example to create an then await the task in the same function will only executed the expected number of tasks at once.

Pipeline::fromIterable( $tasks )
        ->concurrent( $concurrent )
        ->map( fn( Task $task ) => createWorker()->submit( $task )->await() ) // submit and await task to complete
        ->forEach( function( $result ): void {
            $currentTime = DateTime::createFromFormat( 'U.u', sprintf( '%.6F', microtime( true ) ) )->format(
                'H-i-s.u'
            );
            echo "$currentTime\n";
        } )
;

from parallel.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.