Comments (8)
If you want to limit the number of tasks being submitted to the pool, consider using some of our abstractions for queues, mutexes/semaphores, or concurrency. You can find these in amphp/sync
and amphp/pipeline
.
For example, you may be able to use a Pipeline
to limit the concurrency.
Pipeline::fromIterable($taskIterable) // $taskIterable is an iterable producing Task objects.
->concurrent(10) // Limit to 10 simultaneous tasks
->map(fn (Task $task) => workerPool()->submit($task))
->map(fn (Execution $execution) => $execution->await())
->forEach(function ($result): void {
// Do something with task result in $result.
});
from parallel.
What do you mean with wait asynchronously?
from parallel.
I mean I don't want that submit
force me to wait for the task completion. I'd prefer to use wait
for it.
from parallel.
Which version of parallel do you use?
from parallel.
2.2
from parallel.
Right now I'm trying to stop the task by throwing an exception from the run
method. I want to throw an exception from submit
- thanks to it I can catch it without any waits. But I found that only die
throws something (ChannelException
) from submit
; any other exceptions can be caught only in the wait
methods.
Ideally, I would start all tasks asynchronously, then call ContextWorkerPool->shutdown()
, but submit
is spoiling the whole raspberries. Also, it's ok for me to throw something like CancelledException
and catch it in submit
.
from parallel.
Thanks! But it for some reason starts x2 threads. For example, in this code tasks started two each, although the concurrent value is set to 1. With concurrent=2 4 simultaneous tasks will be started.
use Amp\Parallel\Worker\Execution;
use Amp\Parallel\Worker\Task;
use Amp\Pipeline\Pipeline;
use Bro\SampleTask;
use function Amp\Parallel\Worker\createWorker;
class SampleTask implements Task
{
public function run( Channel $channel, Cancellation $cancellation ): mixed
{
sleep( 2 );
return true;
}
}
$tasksToStart = 4;
$concurrent = 1;
$task = new SampleTask();
$tasks = [];
for( $taskNum = 0; $taskNum < $tasksToStart; ++$taskNum ) {
$tasks[] = $task;
}
Pipeline::fromIterable( $tasks )
->concurrent( $concurrent )
->map( fn( Task $task ) => createWorker()->submit( $task ) ) // no pools
->map( fn( Execution $execution ) => $execution->await() )
->forEach( function( $result ): void {
$currentTime = DateTime::createFromFormat( 'U.u', sprintf( '%.6F', microtime( true ) ) )->format(
'H-i-s.u'
);
echo "$currentTime\n";
} )
;
Output:
19-34-27.059025
19-34-27.121341
19-34-29.123185
19-34-29.181591
from parallel.
Multiple tasks are being executed because you're creating the tasks before awaiting the tasks to be completed. Awaiting is what will apply back-pressure to the pipeline (or rather the underlying concurrent iterator) and cause it to wait to consume more values from the source iterator.
Changing the example to create an then await the task in the same function will only executed the expected number of tasks at once.
Pipeline::fromIterable( $tasks )
->concurrent( $concurrent )
->map( fn( Task $task ) => createWorker()->submit( $task )->await() ) // submit and await task to complete
->forEach( function( $result ): void {
$currentTime = DateTime::createFromFormat( 'U.u', sprintf( '%.6F', microtime( true ) ) )->format(
'H-i-s.u'
);
echo "$currentTime\n";
} )
;
from parallel.
Related Issues (20)
- Performance degradation in v2.2.0 HOT 18
- Parallel task(s) keep on running since v2.2 HOT 3
- Awaiting multiple channels HOT 7
- Migrate from parallel-function HOT 3
- Sock files are piling up in tmp folder. They are not getting cleared up automatically. HOT 2
- Under XAMPP 8.2.4 the first example program freeze HOT 2
- Expected a valid stream HOT 6
- How to await for tasks in progress? HOT 2
- Do I need to use "readonly" in "FetchTask.php" when "main.php" contains several "$urls" in my case ??? HOT 1
- Hang when using ext-uv and JIT HOT 4
- Dealing with Zombies processes HOT 9
- Parcel (or analog) to data exchange between Parent and Child processes HOT 1
- communication between parallel worker HOT 1
- The pool was shut down HOT 3
- Forwarding User Interaction and Feedback in CLI app HOT 1
- Despite of DeferredCancellation::cancel() call Cancellation::isRequested() is always false in tasks submitted to ContextWorkerPool HOT 4
- Race condition on IPC socket accept
- Reporting Warning in amphp/parallel Usage on Windows System HOT 1
- Documentation: mention bootstrap file for workers
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from parallel.