Giter Site home page Giter Site logo

amphp / parallel Goto Github PK

View Code? Open in Web Editor NEW
745.0 20.0 61.0 1.5 MB

An advanced parallelization library for PHP, enabling efficient multitasking, optimizing resource use, and application responsiveness through multiple CPU threads.

License: MIT License

PHP 100.00%
concurrency multiprocessing php parallel-processing amphp parallel revolt

parallel's Introduction

amphp/parallel

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. amphp/parallel provides true parallel processing for PHP using multiple processes or threads, without blocking and no extensions required.

To be as flexible as possible, this library comes with a collection of non-blocking concurrency tools that can be used independently as needed, as well as an "opinionated" worker API that allows you to assign units of work to a pool of worker processes.

Latest Release MIT License

Requirements

  • PHP 8.1+

Optional requirements to use threads instead of processes

Installation

This package can be installed as a Composer dependency.

composer require amphp/parallel

Usage

The basic usage of this library is to submit blocking tasks to be executed by a worker pool in order to avoid blocking the main event loop.

<?php

require __DIR__ . '/../vendor/autoload.php';

use Amp\Future;
use Amp\Parallel\Worker;
use function Amp\async;

$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];

$executions = [];
foreach ($urls as $url) {
    // FetchTask is just an example, you'll have to implement
    // the Task interface for your task.
    $executions[$url] = Worker\submit(new FetchTask($url));
}

// Each submission returns an Execution instance to allow two-way
// communication with a task. Here we're only interested in the
// task result, so we use the Future from Execution::getFuture()
$responses = Future\await(array_map(
    fn (Worker\Execution $e) => $e->getFuture(),
    $executions,
));

foreach ($responses as $url => $response) {
    \printf("Read %d bytes from %s\n", \strlen($response), $url);
}

FetchTask is just used as an example for a blocking function here. If you just want to fetch multiple HTTP resources concurrently, it's better to use amphp/http-client, our non-blocking HTTP client.

Note The functions you call must be predefined or autoloadable by Composer, so they also exist in the worker process or thread.

Workers

Worker provides a simple interface for executing PHP code in parallel in a separate PHP process or thread. Classes implementing Task are used to define the code to be run in parallel.

Tasks

The Task interface has a single run() method that gets invoked in the worker to dispatch the work that needs to be done. The run() method can be written using blocking code since the code is executed in a separate process or thread.

Task instances are serialize'd in the main process and unserialize'd in the worker. That means that all data that is passed between the main process and a worker needs to be serializable.

In the example below, a Task is defined which calls a blocking function (file_get_contents() is only an example of a blocking function, use http-client for non-blocking HTTP requests).

Child processes or threads executing tasks may be reused to execute multiple tasks.

// FetchTask.php
// Tasks must be defined in a file which can be loaded by the composer autoloader.

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

class FetchTask implements Task
{
    public function __construct(
        private readonly string $url,
    ) {
    }

    public function run(Channel $channel, Cancellation $cancellation): string
    {
        return file_get_contents($this->url); // Example blocking function
    }
}
// main.php

$worker = Amp\Parallel\Worker\createWorker();
$task = new FetchTask('https://amphp.org');

$execution = $worker->submit($task);

// $data will be the return value from FetchTask::run()
$data = $execution->await();

Sharing data between tasks

Tasks may wish to share data between tasks runs. A Cache instance stored in a static property that is only initialized within Task::run() is our recommended strategy to share data.

use Amp\Cache\LocalCache;
use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;

final class ExampleTask implements Task
{
    private static ?LocalCache $cache = null;
    
    public function run(Channel $channel, Cancellation $cancellation): mixed
    {
        $cache = self::$cache ??= new LocalCache();
        $cachedValue = $cache->get('cache-key');
        // Use and modify $cachedValue...
        $cache->set('cache-key', $updatedValue);
        return $updatedValue;
    }
}

You may wish to provide a hook to initialize the cache with mock data for testing.

A worker may be executing multiple tasks, so consider using AtomicCache instead of LocalCache when creating or updating cache values if a task uses async I/O to generate a cache value. AtomicCache has methods which provide mutual exclusion based on a cache key.

Task cancellation

A Cancellation provided to Worker::submit() may be used to request cancellation of the task in the worker. When cancellation is requested in the parent, the Cancellation provided to Task::run() is cancelled. The task may choose to ignore this cancellation request or act accordingly and throw a CancelledException from Task::run(). If the cancellation request is ignored, the task may continue and return a value which will be returned to the parent as though cancellation had not been requested.

Worker Pools

The easiest way to use workers is through a worker pool. Worker pools can be used to submit tasks in the same way as a worker, but rather than using a single worker process, the pool uses multiple workers to execute tasks. This allows multiple tasks to be executed simultaneously.

The WorkerPool interface extends Worker, adding methods to get information about the pool or pull a single Worker instance out of the pool. A pool uses multiple Worker instances to execute Task instances.

If a set of tasks should be run within a single worker, use the WorkerPool::getWorker() method to pull a single worker from the pool. The worker is automatically returned to the pool when the instance returned is destroyed.

Global Worker Pool

A global worker pool is available and can be set using the function Amp\Parallel\Worker\workerPool(?WorkerPool $pool = null). Passing an instance of WorkerPool will set the global pool to the given instance. Invoking the function without an instance will return the current global instance.

Child Processes or Threads

Contexts simplify writing and running PHP in parallel. A script written to be run in parallel must return a callable that will be run in a child process or thread. The callable receives a single argument – an instance of Channel that can be used to send data between the parent and child processes or threads. Any serializable data can be sent across this channel. The Context object, which extends the Channel interface, is the other end of the communication channel.

Contexts are created using a ContextFactory. DefaultContextFactory will use the best available method of creating context, creating a thread if ext-parallel is installed or otherwise using a child process. ThreadContextFactory (requires a ZTS build of PHP 8.2+ and ext-parallel to create threads) and ProcessContextFactory are also provided should you wish to create a specific context type.

In the example below, a child process or thread is used to call a blocking function (file_get_contents() is only an example of a blocking function, use http-client for non-blocking HTTP requests). The result of that function is then sent back to the parent using the Channel object. The return value of the child callable is available using the Context::join() method.

Child Process or Thread

// child.php

use Amp\Sync\Channel;

return function (Channel $channel): mixed {
    $url = $channel->receive();

    $data = file_get_contents($url); // Example blocking function

    $channel->send($data);

    return 'Any serializable data';
};

Parent Process

// parent.php

use Amp\Parallel\Context\ProcessContext;

// Creates and starts a child process or thread.
$context = Amp\Parallel\Context\contextFactory()->start(__DIR__ . '/child.php');

$url = 'https://google.com';
$context->send($url);

$requestData = $context->receive();
printf("Received %d bytes from %s\n", \strlen($requestData), $url);

$returnValue = $context->join();
printf("Child processes exited with '%s'\n", $returnValue);

Child processes or threads are also great for CPU-intensive operations such as image manipulation or for running daemons that perform periodic tasks based on input from the parent.

Context creation

An execution context can be created using the function Amp\Parallel\Context\startContext(), which uses the global ContextFactory. The global factory is an instance of DefaultContextFactory by default, but this instance can be overridden using the function Amp\Parallel\Context\contextFactory().

// Using the global context factory from Amp\Parallel\Context\contextFactory()
$context = Amp\Parallel\Context\startContext(__DIR__ . '/child.php');

// Creating a specific context factory and using it to create a context.
$contextFactory = new Amp\Parallel\Context\ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

Context factories are used by worker pools to create the context which executes tasks. Providing a custom ContextFactory to a worker pool allows custom bootstrapping or other behavior within pool workers.

An execution context can be created by a ContextFactory. The worker pool uses context factories to create workers.

A global worker pool is available and can be set using the function Amp\Parallel\Worker\workerPool(?WorkerPool $pool = null). Passing an instance of WorkerPool will set the global pool to the given instance. Invoking the function without an instance will return the current global instance.

IPC

A context is created with a single Channel which may be used to bidirectionally send data between the parent and child. Channels are a high-level data exchange, allowing serializable data to be sent over a channel. The Channel implementation handles serializing and unserializing data, message framing, and chunking over the underlying socket between the parent and child.

Note Channels should be used to send only data between the parent and child. Attempting to send resources such as database connections or file handles on a channel will not work. Such resources should be opened in each child process. One notable exception to this rule: server and client network sockets may be sent between parent and child using tools provided by amphp/cluster.

The example code below defines a class, AppMessage, containing a message type enum and the associated message data which is dependent upon the enum case. All messages sent over the channel between the parent and child use an instance of AppMessage to define message intent. Alternatively, the child could use a different class for replies, but that was not done here for the sake of brevity. Any messaging strategy may be employed which is best suited your application, the only requirement is that any structure sent over a channel must be serializable.

The example below sends a message to the child to process an image after receiving a path from STDIN, then waits for the reply from the child. When an empty path is provided, the parent sends null to the child to break the child out of the message loop and waits for the child to exit before exiting itself.

// AppMessage.php

class AppMessage {
    public function __construct(
        public readonly AppMessageType $type,
        public readonly mixed $data,
    ) {
    }
}
// AppMessageType.php

enum AppMessageType {
    case ProcessedImage;
    case ProcessImageFromPath;
    // Other enum cases for further message types...
}
// parent.php

use Amp\Parallel\Context\ProcessContextFactory;

$contextFactory = new ProcessContextFactory();
$context = $contextFactory->start(__DIR__ . '/child.php');

$stdin = Amp\ByteStream\getStdin();

while ($path = $stdin->read()) {
    $message = new AppMessage(AppMessageType::ProcessImageFromPath, $path);
    $context->send($message);

    $reply = $context->receive(); // Wait for reply from child context with processed image data.
}

$context->send(null); // End loop in child process.
$context->join();
// child.php

use Amp\Sync\Channel;

return function (Channel $channel): void {
    /** @var AppMessage|null $message */
    while ($message = $channel->receive()) {
        $reply = match ($message->type) {
            AppMessageType::ProcessImageFromPath => new AppMessage(
                AppMessageType::ProcessedImage,
                ImageProcessor::process($message->data),
            ),
            // Handle other message types...
        }
        
        $channel->send($reply);
    }
};

Creating an IPC socket

Sometimes it is necessary to create another socket for specialized IPC between a parent and child context. One such example is sending sockets between a parent and child process using ClientSocketReceivePipe and ClientSocketSendPipe, which are found in amphp/cluster. An instance of IpcHub in the parent and the Amp\Parallel\Ipc\connect() function in the child.

The example below creates a separate IPC socket between a parent and child, then uses amphp/cluster to create instances of ClientSocketReceivePipe and ClientSocketSendPipe in the parent and child, respectively.

// parent.php
use Amp\Cluster\ClientSocketSendPipe;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Ipc\LocalIpcHub;

$ipcHub = new LocalIpcHub();

// Sharing the IpcHub instance with the context factory isn't required,
// but reduces the number of opened sockets.
$contextFactory = new ProcessContextFactory(ipcHub: $ipcHub); 

$context = $contextFactory->start(__DIR__ . '/child.php');

$connectionKey = $ipcHub->generateKey();
$context->send(['uri' => $ipcHub->getUri(), 'key' => $connectionKey]);

// $socket will be a bidirectional socket to the child.
$socket = $ipcHub->accept($connectionKey);

$socketPipe = new ClientSocketSendPipe($socket);
// child.php
use Amp\Cluster\ClientSocketReceivePipe;
use Amp\Sync\Channel;

return function (Channel $channel): void {
    ['uri' => $uri, 'key' => $connectionKey] = $channel->receive();
    
    // $socket will be a bidirectional socket to the parent.
    $socket = Amp\Parallel\Ipc\connect($uri, $connectionKey);
    
    $socketPipe = new ClientSocketReceivePipe($socket);
};

Debugging

Step debugging may be used in child processes with PhpStorm and Xdebug by listening for debug connections in the IDE.

In PhpStorm settings, under PHP > Debug, ensure the box "Can accept external connections" is checked. The specific ports used are not important, yours may differ.

PhpStorm Xdebug settings

For child processes to connect to the IDE and stop at breakpoints set in the child processes, turn on listening for debug connections.

Listening off:

Debug listening off

Listening on:

Debug listening on

No PHP ini settings need to be set manually. Settings set by PhpStorm when invoking the parent PHP process will be forwarded to child processes.

Run the parent script in debug mode from PhpStorm with breakpoints set in code executed in the child process. Execution should stop at any breakpoints set in the child.

Debugger running:

Debug running

When stopping at a breakpoint in a child process, execution of the parent process and any other child processes will continue. PhpStorm will open a new debugger tab for each child process connecting to the debugger, so you may need to limit the amount of child processes created when debugging or the number of connections may become overwhelming! If you set breakpoints in the parent and child process, you may need to switch between debug tabs to resume both the parent and child.

Versioning

amphp/parallel follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please use the private security issue reporter instead of using the public issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

parallel's People

Contributors

andreybolonin avatar bwoebi avatar bzikarsky avatar carusogabriel avatar danog avatar daverandom avatar ekstazi avatar enumag avatar gemorroj avatar irazasyed avatar ishan-biztech avatar kelunik avatar kipanshi avatar maciejkosiarski avatar natorator avatar peehaa avatar peter279k avatar rybakit avatar sagebind avatar staabm avatar stefanotorresi avatar straube avatar thgs avatar tommy-muehle avatar trowski avatar voku avatar yched avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

parallel's Issues

Call to a member function connection() on null with PHP 7.1

I was using this effectively until I upgraded to 7.1. I dont think thats an issue as I havent been able to find anyone else with it, but the error message is not descriptive so im not sure whats going on. With the basic code example:

$responses = wait(parallelMap([
      'https://google.com/',
      'https://github.com/',
      'https://stackoverflow.com/',
    ], function ($url) {
      return file_get_contents($url);
    }));

I get this error:

An uncaught Exception was encountered
Type: Amp\MultiReasonException

Message: Multiple errors encountered

Filename: /home/orgname/public_html/vendor/amphp/parallel-functions/src/functions.php

Line Number: 56

Backtrace:

File: /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php
Line: 76
Function: send

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 130
Function: Amp\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Deferred.php
Line: 45
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/functions.php
Line: 466
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/ResolutionQueue.php
Line: 55
Function: Amp\Promise\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Failure.php
Line: 29
Function: __invoke

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 125
Function: onResolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 155
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php
Line: 95
Function: fail

File: /home/orgname/public_html/vendor/amphp/amp/lib/Failure.php
Line: 29
Function: Amp\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 125
Function: onResolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 155
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php
Line: 95
Function: fail

File: /home/orgname/public_html/vendor/amphp/amp/lib/Failure.php
Line: 29
Function: Amp\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 125
Function: onResolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 155
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php
Line: 95
Function: fail

File: /home/orgname/public_html/vendor/amphp/amp/lib/Failure.php
Line: 29
Function: Amp\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 125
Function: onResolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 155
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php
Line: 95
Function: fail

File: /home/orgname/public_html/vendor/amphp/amp/lib/Failure.php
Line: 29
Function: Amp\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 125
Function: onResolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php
Line: 155
Function: resolve

File: /home/orgname/public_html/vendor/amphp/amp/lib/Deferred.php
Line: 55
Function: fail

File: /home/orgname/public_html/vendor/amphp/amp/lib/functions.php
Line: 214
Function: fail

File: /home/orgname/public_html/vendor/amphp/amp/lib/Loop/NativeDriver.php
Line: 130
Function: Amp\Promise\{closure}

File: /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php
Line: 134
Function: dispatch

File: /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php
Line: 72
Function: tick

File: /home/orgname/public_html/vendor/amphp/amp/lib/Loop.php
Line: 84
Function: run

File: /home/orgname/public_html/vendor/amphp/amp/lib/functions.php
Line: 170
Function: run

File: /home/orgname/public_html/application/controllers/Lists.php
Line: 63
Function: Amp\Promise\wait

File: /home/orgname/public_html/index.php
Line: 315
Function: require_once

if i print out out the error on line 56 of /home/orgname/public_html/vendor/amphp/parallel-functions/src/functions.php
i see a very long error, here's some of it:

Array ( [0] => Amp\Parallel\Context\ContextException Object ( [message:protected] => Starting the process failed [string:Exception:private] => [code:protected] => 0 [file:protected] => /home/orgname/public_html/vendor/amphp/parallel/lib/Context/Process.php [line:protected] => 197 [trace:Exception:private] => Array ( [0] => Array ( [function] => Amp\Parallel\Context\{closure} [class] => Amp\Parallel\Context\Process [type] => -> [args] => Array ( ) ) [1] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php [line] => 73 [function] => throw [class] => Generator [type] => -> [args] => Array ( [0] => Amp\Parallel\Context\ContextException Object ( [message:protected] => Starting the process timed out [string:Exception:private] => [code:protected] => 0 [file:protected] => /home/orgname/public_html/vendor/amphp/parallel/lib/Context/Internal/ProcessHub.php [line:protected] => 118 [trace:Exception:private] => Array ( [0] => Array ( [function] => Amp\Parallel\Context\Internal\{closure} [class] => Amp\Parallel\Context\Internal\ProcessHub [type] => -> [args] => Array ( ) ) [1] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Coroutine.php [line] => 73 [function] => throw [class] => Generator [type] => -> [args] => Array ( [0] => Amp\TimeoutException Object ( [message:protected] => Operation timed out [string:Exception:private] => [code:protected] => 0 [file:protected] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line:protected] => 214 [trace:Exception:private] => Array ( [0] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/NativeDriver.php [line] => 130 [function] => Amp\Promise\{closure} [args] => Array ( [0] => r [1] => ) ) [1] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 134 [function] => dispatch [class] => Amp\Loop\NativeDriver [type] => -> [args] => Array ( [0] => 1 ) ) [2] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 72 [function] => tick [class] => Amp\Loop\Driver [type] => -> [args] => Array ( ) ) [3] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop.php [line] => 84 [function] => run [class] => Amp\Loop\Driver [type] => -> [args] => Array ( ) ) [4] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line] => 170 [function] => run [class] => Amp\Loop [type] => :: [args] => Array ( [0] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => [promise] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => Closure Object ( [this] => Amp\Coroutine Object *RECURSION* [parameter] => Array ( [$exception] => [$value] => ) ) [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => Array ( [0] => Array *RECURSION* [1] => Array ( ) ) [resolved:Amp\Coroutine:private] => [result:Amp\Coroutine:private] => [onResolved:Amp\Coroutine:private] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => ) [parameter] => Array ( [$e] => [$v] => ) ) [resolutionTrace:Amp\Coroutine:private] => ) ) ) ) ) [5] => Array ( [file] => /home/orgname/public_html/application/controllers/Lists.php [line] => 63 [function] => Amp\Promise\wait [args] => Array ( [0] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => Closure Object ( [this] => Amp\Coroutine Object *RECURSION* [parameter] => Array ( [$exception] => [$value] => ) ) [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => Array ( [0] => Array *RECURSION* [1] => Array ( ) ) [resolved:Amp\Coroutine:private] => [result:Amp\Coroutine:private] => [onResolved:Amp\Coroutine:private] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => ) [parameter] => Array ( [$e] => [$v] => ) ) [resolutionTrace:Amp\Coroutine:private] => ) ) ) [6] => Array ( [file] => /home/orgname/public_html/system/core/CodeIgniter.php [line] => 532 [function] => testme [class] => Lists [type] => -> [args] => Array ( ) ) [7] => Array ( [file] => /home/orgname/public_html/index.php [line] => 315 [args] => Array ( [0] => /home/orgname/public_html/system/core/CodeIgniter.php ) [function] => require_once ) ) [previous:Exception:private] => ) ) ) [2] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Failure.php [line] => 29 [function] => Amp\{closure} [class] => Amp\Coroutine [type] => -> [args] => Array ( [0] => Amp\TimeoutException Object ( [message:protected] => Operation timed out [string:Exception:private] => [code:protected] => 0 [file:protected] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line:protected] => 214 [trace:Exception:private] => Array ( [0] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/NativeDriver.php [line] => 130 [function] => Amp\Promise\{closure} [args] => Array ( [0] => r [1] => ) ) [1] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 134 [function] => dispatch [class] => Amp\Loop\NativeDriver [type] => -> [args] => Array ( [0] => 1 ) ) [2] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 72 [function] => tick [class] => Amp\Loop\Driver [type] => -> [args] => Array ( ) ) [3] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop.php [line] => 84 [function] => run [class] => Amp\Loop\Driver [type] => -> [args] => Array ( ) ) [4] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line] => 170 [function] => run [class] => Amp\Loop [type] => :: [args] => Array ( [0] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => [promise] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => Closure Object ( [this] => Amp\Coroutine Object *RECURSION* [parameter] => Array ( [$exception] => [$value] => ) ) [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => Array ( [0] => Array *RECURSION* [1] => Array ( ) ) [resolved:Amp\Coroutine:private] => [result:Amp\Coroutine:private] => [onResolved:Amp\Coroutine:private] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => ) [parameter] => Array ( [$e] => [$v] => ) ) [resolutionTrace:Amp\Coroutine:private] => ) ) ) ) ) [5] => Array ( [file] => /home/orgname/public_html/application/controllers/Lists.php [line] => 63 [function] => Amp\Promise\wait [args] => Array ( [0] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => Closure Object ( [this] => Amp\Coroutine Object *RECURSION* [parameter] => Array ( [$exception] => [$value] => ) ) [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => Array ( [0] => Array *RECURSION* [1] => Array ( ) ) [resolved:Amp\Coroutine:private] => [result:Amp\Coroutine:private] => [onResolved:Amp\Coroutine:private] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => ) [parameter] => Array ( [$e] => [$v] => ) ) [resolutionTrace:Amp\Coroutine:private] => ) ) ) [6] => Array ( [file] => /home/orgname/public_html/system/core/CodeIgniter.php [line] => 532 [function] => testme [class] => Lists [type] => -> [args] => Array ( ) ) [7] => Array ( [file] => /home/orgname/public_html/index.php [line] => 315 [args] => Array ( [0] => /home/orgname/public_html/system/core/CodeIgniter.php ) [function] => require_once ) ) [previous:Exception:private] => ) [1] => ) ) [3] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php [line] => 125 [function] => onResolve [class] => Amp\Failure [type] => -> [args] => Array ( [0] => Closure Object ( [this] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => [resolved:Amp\Coroutine:private] => 1 [result:Amp\Coroutine:private] => Amp\Failure Object ( [exception:Amp\Failure:private] => Amp\Parallel\Context\ContextException Object *RECURSION* ) [onResolved:Amp\Coroutine:private] => [resolutionTrace:Amp\Coroutine:private] => ) [parameter] => Array ( [$exception] => [$value] => ) ) ) ) [4] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Internal/Placeholder.php [line] => 155 [function] => resolve [class] => class@anonymous/home/orgname/public_html/vendor/amphp/amp/lib/Deferred.php0x2b30279492d7 [type] => -> [args] => Array ( [0] => Amp\Failure Object ( [exception:Amp\Failure:private] => Amp\TimeoutException Object ( [message:protected] => Operation timed out [string:Exception:private] => [code:protected] => 0 [file:protected] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line:protected] => 214 [trace:Exception:private] => Array ( [0] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/NativeDriver.php [line] => 130 [function] => Amp\Promise\{closure} [args] => Array ( [0] => r [1] => ) ) [1] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 134 [function] => dispatch [class] => Amp\Loop\NativeDriver [type] => -> [args] => Array ( [0] => 1 ) ) [2] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 72 [function] => tick [class] => Amp\Loop\Driver [type] => -> [args] => Array ( ) ) [3] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop.php [line] => 84 [function] => run [class] => Amp\Loop\Driver [type] => -> [args] => Array ( ) ) [4] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line] => 170 [function] => run [class] => Amp\Loop [type] => :: [args] => Array ( [0] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => [promise] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => Closure Object ( [this] => Amp\Coroutine Object *RECURSION* [parameter] => Array ( [$exception] => [$value] => ) ) [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => Array ( [0] => Array *RECURSION* [1] => Array ( ) ) [resolved:Amp\Coroutine:private] => [result:Amp\Coroutine:private] => [onResolved:Amp\Coroutine:private] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => ) [parameter] => Array ( [$e] => [$v] => ) ) [resolutionTrace:Amp\Coroutine:private] => ) ) ) ) ) [5] => Array ( [file] => /home/orgname/public_html/application/controllers/Lists.php [line] => 63 [function] => Amp\Promise\wait [args] => Array ( [0] => Amp\Coroutine Object ( [generator:Amp\Coroutine:private] => Generator Object ( ) [onResolve:Amp\Coroutine:private] => Closure Object ( [this] => Amp\Coroutine Object *RECURSION* [parameter] => Array ( [$exception] => [$value] => ) ) [immediate:Amp\Coroutine:private] => 1 [exception:Amp\Coroutine:private] => [value:Amp\Coroutine:private] => Array ( [0] => Array *RECURSION* [1] => Array ( ) ) [resolved:Amp\Coroutine:private] => [result:Amp\Coroutine:private] => [onResolved:Amp\Coroutine:private] => Closure Object ( [static] => Array ( [resolved] => [value] => [exception] => ) [parameter] => Array ( [$e] => [$v] => ) ) [resolutionTrace:Amp\Coroutine:private] => ) ) ) [6] => Array ( [file] => /home/orgname/public_html/system/core/CodeIgniter.php [line] => 532 [function] => testme [class] => Lists [type] => -> [args] => Array ( ) ) [7] => Array ( [file] => /home/orgname/public_html/index.php [line] => 315 [args] => Array ( [0] => /home/orgname/public_html/system/core/CodeIgniter.php ) [function] => require_once ) ) [previous:Exception:private] => ) ) ) ) [5] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Deferred.php [line] => 55 [function] => fail [class] => class@anonymous/home/orgname/public_html/vendor/amphp/amp/lib/Deferred.php0x2b30279492d7 [type] => -> [args] => Array ( [0] => Amp\TimeoutException Object ( [message:protected] => Operation timed out [string:Exception:private] => [code:protected] => 0 [file:protected] => /home/orgname/public_html/vendor/amphp/amp/lib/functions.php [line:protected] => 214 [trace:Exception:private] => Array ( [0] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/NativeDriver.php [line] => 130 [function] => Amp\Promise\{closure} [args] => Array ( [0] => r [1] => ) ) [1] => Array ( [file] => /home/orgname/public_html/vendor/amphp/amp/lib/Loop/Driver.php [line] => 134 [function] => dispatch [class] => Amp\Loop\NativeDriver [type] => -> [args] => Array ( [0] => 1 ) ) [2] => Array ( [file] => 

any help would be appreciated!

Undefined Constant STDERR in parallel/lib/Context/Process

I'm using php-fpm7.0.22 + nginx
when I tried to execute this piece of code demo in artax documentation, it showed the error message as title.

        foreach ($requests as $request) {
            $promises[] = call(function () use ($defaultClient, $request) {
                $response = yield $defaultClient->request($request);
                $body = yield $response->getBody();
                return $body;
            });
        }
        $response = wait(all($promises));

I think this issue is relate to #26 . and I'm using artax 3.0.13 and parallel ^0.2. Is this issue not resolved completely?

Support delayed port access (e.g. due to firewalls)

@kelunik told me to open a ticket here.

Please open an issue at https://github.com/amphp/parallel for the infinite loop and https://github.com/amphp/process for the ports if you want.

its quite a while but as the problem with box was easy solvable by using the --no-parallel option i simply forgot until i hit the problem again with fink.

i am not sure where to write which info, it feels i would post the same text in parallel and process as i haven't used amphp directly but only tools which use it, i don't know the internals.

if i run fink it simply gets stuck forever on a windows system with a firewall.

php fink.phar https://getcomposer.org

console

if my firewall is in learning mode, it ask me to allow ampXXXX.tmp a local connection.

firewall-dialog

after allowing it, the connection monitor show the following:

connections

this 3 php and amp processes stay there forever in an infinite loop and are doing nothing (no more as the initial 22 bytes are sent or received).

if i run the firewall in allow all mode (which sadly makes no sense at all), it works without a problem.

so it looks like amphp has a problem if the port is not instantly accessible but delayed by a firewall learning mode dialog.
as the amp process names are randomly generated it is not possible to precreate the firewall rules.
having some kind of check if ports can be accessed and if not some kind of fallback could be a solution. a slow synchronous processing is still faster as a infinite do nothing processing :)

Amp\Parallel\ChannelException: The socket unexpectedly closed

This excpetion is throw ± 1 out of 10 times when running this test. The test itself will take a path of an image, scales it down and saves it about 10 times. Each downscaling is a separate Fork.

This is the testcode.

    public function test_async() {
        $url = 'img/image.jpeg';
        $factory = new ResponsiveFactory(new DefaultConfigurator([
            'publicPath'   => $this->publicPath,
            'engine'       => 'gd',
            'stepModifier' => 0.5,
            'scaler'       => 'filesize',
            'enableCache'  => false,
            'async'        => true,
        ]));

        $responsiveImage = $factory->create($url);

        $this->assertTrue(count($responsiveImage->getSrcset()) > 1);
        $this->assertEquals("/{$url}", $responsiveImage->src());

        $testCase = $this;
        $responsiveImage->onSaved(function () use ($testCase, $responsiveImage) {
            $fs = new Filesystem();

            foreach ($responsiveImage->getSrcset() as $src) {
                $src = trim($src, '/');

                $testCase->assertTrue($fs->exists("{$testCase->publicPath}/{$src}"));
            }
        });

        \Amp\wait($responsiveImage->getMainPromise());
    }

And this is the exception trace.

PHPUnit 5.7.14 by Sebastian Bergmann and contributors.

E                                                                   1 / 1 (100%)

Time: 673 ms, Memory: 30.00MB

There was 1 error:

1) Brendt\Image\Tests\Phpunit\ResponsiveFactoryTest::test_async
Amp\Parallel\ChannelException: The socket unexpectedly closed

/Users/brent1/Documents/sites/brendt/responsive-image/vendor/amphp/parallel/lib/Sync/ChannelledSocket.php:91
/Users/brent1/Documents/sites/brendt/responsive-image/vendor/amphp/loop/lib/NativeLoop.php:113
/Users/brent1/Documents/sites/brendt/responsive-image/vendor/amphp/loop/lib/NativeLoop.php:42
/Users/brent1/Documents/sites/brendt/responsive-image/vendor/amphp/loop/lib/Loop.php:96
/Users/brent1/Documents/sites/brendt/responsive-image/vendor/amphp/loop/lib/Loop.php:47
/Users/brent1/Documents/sites/brendt/responsive-image/vendor/async-interop/event-loop/src/Loop.php:78
/Users/brent1/Documents/sites/brendt/responsive-image/vendor/amphp/amp/lib/functions.php:116
/Users/brent1/Documents/sites/brendt/responsive-image/tests/phpunit/ResponsiveFactoryTest.php:164

The code for downscaling images:

$factory = $this;

$promise = Fork::spawn(function () use ($factory, $imageObject, $width, $height, $scaledFilePath) {
    $scaledImage = $imageObject->resize((int) $width, (int) $height)->encode($imageObject->extension);
    $factory->saveImageFile($scaledFilePath, $scaledImage);
})->join();

return $promise;

All promises are combined into one using \Amp\all().

Worker examples did not work

Hi,

I want try the worker examples but got an error:

Uncaught Error: Call to undefined function Amp\Promise\capture() in ... lib/Process/ChannelledProcess.php:100

Executed with "PHP 7.1.4". Let me hear if you need more informations.

Can't use objects instances inside Thread::spawn

I am using the following sample about threads [https://github.com/amphp/parallel/blob/master/examples/thread.php]

And works fine. However, when I try to use an object inside spawn, is not working.

Without object instance

<?php

require(dirname(__DIR__) . '/autoload.php');

use Amp\Loop;
use Amp\Parallel\Threading\Thread;
use Amp\Delayed;

Loop::run(function () {
    $timer = Loop::repeat(1000, function () {
        static $i;
        $i = $i ? ++$i : 1;
        print "Demonstrating how alive the parent is for the {$i}th time.\n";
    });
    try {
        // Create a new child thread that does some blocking stuff.
        $context1 = Thread::spawn(function () {
            echo "inside spawn \n";
            sleep(1);
            echo "end spawn \n";
        });

        yield new Delayed(2000);
        yield $context1->send([1]);
    } finally {
        Loop::cancel($timer);
    }
});

The output is :

$ zts-php modules/Jobs/scripts/test.php
inside spawn 
Demonstrating how alive the parent is for the 1th time.
end spawn 

With object instance

require(dirname(__DIR__) . '/autoload.php');

use Amp\Loop;
use Amp\Parallel\Threading\Thread;
use Amp\Delayed;

class A {
    public function get() {
        return "a value";
    }
}
Loop::run(function () {
    $timer = Loop::repeat(1000, function () {
        static $i;
        $i = $i ? ++$i : 1;
        print "Demonstrating how alive the parent is for the {$i}th time.\n";
    });
    try {
        // Create a new child thread that does some blocking stuff.
        $context1 = Thread::spawn(function () {
            echo "inside spawn \n";
            $a = new A();
            echo $a->get();
            echo "end spawn \n";
        });

        yield new Delayed(2000);
        yield $context1->send([1]);
    } finally {
        Loop::cancel($timer);
    }
});

The output is :

$ zts-php modules/Jobs/scripts/test.php
inside spawn 
Demonstrating how alive the parent is for the 1th time.

Some additional information:

  • Working with centos 7.3
  • Working with PHP 7.2
  • Working with Zend Framework
  • Working with pthreads.so extension

Any suggestion? I think that my class A must have some interface or extends from other class build on the Amp package but I can not find any documentation.

PooledWorker::__destroy cause fatal error

How to reproduce it:

  1. getWorker from pool
  2. Kill or shutdown it.
  3. Get all other workers from pool.
  4. Trigger gc to work.

Because worker is not running amp will detach it from SplObjectHash. And if polledworker return worker to hash then:

PHP Fatal error:  Uncaught UnexpectedValueException: Object not found in /app/vendor/amphp/parallel/lib/Worker/DefaultPool.php:73
Stack trace:
#0 /app/vendor/amphp/parallel/lib/Worker/DefaultPool.php(73): SplObjectStorage->offsetGet(Object(Amp\Parallel\Worker\WorkerProcess))
#1 /app/vendor/amphp/parallel/lib/Worker/Internal/PooledWorker.php(33): Amp\Parallel\Worker\DefaultPool::Amp\Parallel\Worker\{closure}(Object(Amp\Parallel\Worker\WorkerProcess))
#2 [internal function]: Amp\Parallel\Worker\Internal\PooledWorker->__destruct()
#3 {main}
  thrown in /app/vendor/amphp/parallel/lib/Worker/DefaultPool.php on line 73

Not able to locate PHP Binary on macOS

I was trying this library today with brew installed PHP packages and I ended up with this exception:

Could not locate PHP executable binary

Turns out, the getenv('PATH') returns false.

I tried Symfony's PhpExecutableFinder and it works fine and is able to locate the binary.

Would it be possible to either use Symfony's PHPExecutable Finder or make changes to the existing code to do what Symfony does to locate the binary?

Thanks!

The channel is has been closed When we use more that 1 parallel process

Hello. I tried to use this code for getting some data from url array

Loop::run(function ()  {
    $uris = [
        'http://local.test/sleep.php?sec=1',
        'http://local.test/sleep.php?sec=5',
    ];

    $threads = [];
    foreach ($uris as $key => $uri) {

        $threads[] = Thread::spawn(function ($uri) {
            $result = file_get_contents($uri);
            return $result;
        }, $uri);
    }

    foreach ($threads as $thread) {
        $result[] = yield $thread->join();
    }
});

Code from sleep.php : <?php sleep($_GET['sec']);

And it is works correctly, but if i change the line order of urls from
'http://local.test/sleep.php?sec=1',
'http://local.test/sleep.php?sec=5',
to
'http://local.test/sleep.php?sec=5',
'http://local.test/sleep.php?sec=1',

Every time I've error:

Fatal error: Uncaught Amp\Parallel\Sync\ChannelException: The channel is has been closed in /home/anton/www/test/php/parallel/vendor/amphp/parallel/lib/Sync/ChannelledSocket.php:251

Could you tell me how I can solve such problem?

PosixSemaphore spend too many time on loop

do {
// Attempt to acquire a lock from the semaphore.
if (@\msg_receive($this->queue, 0, $type, 1, $chr, false, \MSG_IPC_NOWAIT, $errno)) {
// A free lock was found, so resolve with a lock object that can
// be used to release the lock.
return new Lock(function (Lock $lock) {
$this->release();
});
}
// Check for unusual errors.
if ($errno !== \MSG_ENOMSG) {
throw new SemaphoreException('Failed to acquire a lock.');
}
} while (yield new Delayed(self::LATENCY_TIMEOUT, true));

Those code is trying to acquire lock at loop.

If we have 1000 coroutine try to acquire, there are 1000 coroutine scheduled by Amp\Loop.


For example(with uv extension):

Loop::run(function () {
    $client = new DefaultClient();

    for ($i = 0; $i < 1000; $i++) {
        $resp = yield $client->request('http://localhost:4444/');
        unset($resp);
    }
});
real	0m1.443s
user	0m0.971s
sys	0m0.056s

Loop::run(function () {
    $s = new PosixSemaphore(2);
    $client = new DefaultClient();

    $coroutines = [];
    for ($i = 0; $i < 1000; $i++) {
        $coroutines = new Coroutine(request($client, $s));
    }
    yield $coroutines;
});

function request($client, &$s)
{
    $lock = yield $s->acquire();
    yield $client->request('http://localhost:4444/');
    $lock->release();
}
real	0m19.484s
user	0m14.544s
sys	0m0.354s

Make classes autoloadable

Hi there,

When implementing the a Task I get this error message:

Uncaught Error in worker with message "Classes implementing Amp\Parallel\Worker\Task must be autoloadable by the Composer autoloader" and code "0"

<?php

namespace Jobs;

use Phalcon\Mvc\Controller;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;

class GetMarketJob extends Controller implements Task {
    /**
     * @var callable
     */
    private $function;
    /**
     * @var mixed[]
     */
    private $args;

    public function execute(callable $function, ...$args) {
        $this->function = $function;
        $this->args = $args;
    }

    /**
     * {@inheritdoc}
     */
    public function run(Environment $environment) {
        return ($this->function)(...$this->args);
    }

    public function getArgs() {
        return $this->args;
    }    
}

How do I solve this?

How to stop a loop and waiting gracefully for end of current spawned sub-processes

Hi Niklas @kelunik

one more but different question.

I can't figure out how to stop a loop and wait until the current active sub-processes are finished.

My code so far:

        // define worker pool
        $pool = new DefaultPool($cnt_threads);

        // loop through files
        foreach ($files as $file) {
          // create the task and push to pool
          $task = new FileTask($file);
          $pool->enqueue($task);
        }

        // event loop for parallel tasks
        Loop::run(function () use (&$pool) {

          // What to do when a signal is received
          foreach (array(SIGINT, SIGHUP, SIGTERM) as $signal) {
            Loop::unreference(
              Loop::onSignal($signal,
                function () use ($signal) {
                  echo "\nABORT: caught SIGNAL [" . $signal . "]! Trying gracefully shutdown ...\n";
                }
              )
            );
          }

          return $pool->shutdown();
        });

If I press now CTRL-C the message comes up each time pressing keys and the Loop waits until current tasks are finished and stops then.

BUT

If I send kill -2 process-id from console, just the message comes up but the Loop runs until all tasks are done.

  

Wondering what to do to gracefully break the Loop::run(). Do you have implement a special handler so that CTRL-C works?

  

If I add Loop::stop(); in the onSignal function, the Loop immediately stops and the main-app ends. BUT all current sub-processes are still running and visible as processes (ps -ef).

  
  
Thanks for your help
Tom

Documentation/stable release

What's the status of this library? Is it still going to replace amphp/thread? If so, will there be a stable release and possibly documentation?

Context apparently died but still running?

Hi,

I'm using amp and parallel from about 1 year ago with no issues at all. Recently, I have updated both using Composer and now, from time to time, I'm getting this error:

[2018-05-04 08:30:04] app.CRITICAL: Error sending ID 4833. Error: The channel closed unexpectedly. Did the context die? {"script":"Watcher.php"} []

The weird thing is that according to my log's the thread is still running:

[2018-05-04 08:30:04] app.INFO: Processing ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:30:04] app.CRITICAL: Error sending ID 4833. Error: The channel closed unexpectedly. Did the context die? {"script":"Watcher.php"} []
----- AT THIS POINT THE THREAD SHOULD BE DEAD, BUT I'M GETTING LOGS FROM IT ----
[2018-05-04 08:32:45] app.INFO: Sending batch 0 to 1000 emails ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:32:50] app.INFO: Batch 0 sent ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:33:51] app.INFO: Sending batch 1 to 1000 emails ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:33:56] app.INFO: Batch sent 1 ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:19] app.INFO: Sending batch 2 to 1000 emails ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:26] app.INFO: Batch sent 2 ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:29] app.INFO: Sending batch 3 to 1000 emails ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:38] app.INFO: Batch sent 3 ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:40] app.INFO: Sending batch 4 to 1000 emails ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:48] app.INFO: Batch sent 4 ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:50] app.INFO: Sending batch 5 to 278 emails ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:52] app.INFO: Batch sent 5 ID 4833 {"script":"Watcher.php"} []
[2018-05-04 08:34:52] app.INFO: Invalid emails ID 4833: 0 | Array ( )  {"script":"Watcher.php"} []
[2018-05-04 08:34:52] app.INFO:  Done ID 4833 | Emails sent: 5278 | Final state: sent {"script":"Watcher.php"} []

Hope you can help me, thanks!

Use custom error handler

We should use a custom error handler to transfer errors back to the parent process. We need to figure out how to deal with startup errors. I think we should use -d to only log them to STDERR. Another alternative would be simply parsing them. See https://travis-ci.org/amphp/file/jobs/245354788#L1645 which fails because UV can't be loaded, as compiling it fails.

Using worker in php fpm

To reply to a user request faster, i want to start multiple parallel jobs, wait for all jobs to finish, collect all responses of the jobs and return them to the user. For this requirement i'd like to use this library.

As a start I tried the worker and worker-pool example within my php-fpm process and got the following error: "Undefined constant 'STDERR' in /var/www/vendor/amphp/parallel/lib/Process/ChannelledProcess.php on line 82"

When i define STDERR myself, i get the next error:
"Uncaught Amp\Parallel\Sync\ChannelException: Invalid header received: [...] in /var/www/vendor/amphp/parallel/lib/Worker/AbstractWorker.php on line 172"

I'm running the code in a docker image php:7.1-fpm-alpine with only a few additional extensions. Non of the suggested, but apparently not required, extensions for threads or forks have been installed.

Is this library restricted to cli use? The readme doesn't state anything like this.

Running slower with amphp/parallel then when running code synchrounously

When running code in parallel the results say its running much slower, about 3 seconds slower than without. System is Ubuntu 16, PHP 7 FPM/FastCGI, amphp/parallel "v0.2.3" (reference 11a3e27), 4 cores, 16 GB of RAM.

Code samples are below, unfortunately to post what is inside these methods is just too much for a code snippet, but Foo() is mainly just waiting on an HTTP GET request and doing some light pre/post processing. Bar() is just waiting on a SOAP request and doing some light pre/post processing as well.

Without parallel about 8 seconds

if (!empty($fooProperties)) {
    $Foo = new Foo();
    $results['foo'] = $Foo->properties($fooProperties, $request);
}

if (!empty($barProperties)) {
    $Bar = new Bar();
    $results['bar'] = $Bar->properties($barProperties, $request);
}

foreach ($results as $r) {
    foreach ($r as $property_id => $p) {
        if (isset($properties[$property_id])) {
            $properties[$property_id] = $p;
        }
    }
}

With parallel about 11 seconds

if (!empty($fooProperties)) {
    $Foo = new Foo();
    $promises['foo'] = parallel(function () use($Foo, $fooProperties, $request) {
        return $Foo->properties($fooProperties, $request);
    })();
}

if (!empty($barProperties)) {
    $Bar = new Bar();
    $promises['bar'] = parallel(function () use($Bar, $barProperties, $request) {
        return $Bar->properties($barProperties, $request);
    })();
}


if (!empty($promises)) {
    $results = \Amp\Promise\wait(\Amp\Promise\all($promises));
    foreach ($results as $r) {
        foreach ($r as $property_id => $p) {
            if (isset($properties[$property_id])) {
                $properties[$property_id] = $p;
            }
        }
    }
}

Any ideas here?

PHP Fatal error: Could not send result to parent

Hi,

While experimenting a bit I am seeing the following error in my logs:

 PHP Fatal error:  Could not send result to parent in /socket/vendor/amphp/parallel/lib/Context/Internal/process-runner.php on line 105

I've done some debugging and the root cause is because the child process process-runner.php is not stopped gracefully. When the loop ends it's forcefully killed via Runner->kill() which causes an Exception and then the error the triggered.

It's reproducible by using the simple http client example script. A simple setup to reproduce using Docker:

Run:

docker run --rm -it php:7.2 bash -l

And then just paste:

   apt-get update \
&& apt-get install --no-install-recommends -y git unzip \
&& curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin/ --filename=composer \
&& git clone https://github.com/amphp/socket.git \
&& cd socket \
&& composer install --no-dev \
&& echo '[PHP]
error_log=/proc/1/fd/2
' > /usr/local/etc/php/php.ini \
&& (php -S localhost:80 &) \
&& touch index.php \
&& php examples/simple-http-client.php http://localhost/

That just creates a clone of this repository, sets the PHP error log to the stdout of PID 1 so the error becomes visible and then launches the example against a local web server started via PHP.

Any idea why the child process isn't stopped properly?

Using object and its dependencies vs problem with serialization

I was trying out worker pool example. It works very well when using simple things, like php function or simple anonymous function without binding them with any existing object. Could you please tell if while using parallel it's possible to work with objects and its dependencies?

There is always problem with serialization. I have read the note about opis/closure library and restrictions about serializing. I was trying to wrap my function call in new SerializableClosure(), but then I get exceptions as following:

[Amp\Parallel\Context\ContextException]                                           
  The context stopped responding, potentially due to a fatal error or calling exit  
                                                                                    
  [Amp\Parallel\Sync\ChannelException]                   
  The channel closed unexpectedly. Did the context die?

Here there is an example of what I am trying to execute:

class Something {

public function test($item)
    {
        var_dump($item);
    }


public function execute() {

            $tasks = [
                 new BlockingTask(new SerializableClosure(function ($item) {
                    $this->test($item);
                } ), 5)
            ]
           

            Loop::run(function () use (&$results, &$tasks) {
            $timer = Loop::repeat(200, function () {
                printf(".");
            });
            Loop::unreference($timer);
            $pool = new DefaultPool;
            $coroutines = [];
            foreach ($tasks as $task) {
                $coroutines[] = function () use ($pool, $task, &$results) {
                    $result = yield $pool->enqueue($task);
                    $url = $task->getArgs()[0];
                    //printf("\nRead from %s: %d bytes\n", $url, strlen($result));
                    $results[$url] = $result;
                };
            }
            $coroutines = array_map(function (callable $coroutine): Coroutine {
                return new Coroutine($coroutine());
            }, $coroutines);
            yield \Amp\Promise\all($coroutines);
            return yield $pool->shutdown();
        });
        echo "\nResult array keys:\n";
        echo var_export(array_keys($results), true);
    }
}

Of course tricky part is that with building new BlockingTask(). Is it generally possible - I mean pass as callable to parallel concrete class method and run it successfully?

Type error

Got this error recently:

Argument 3 passed to Amp\Parallel\Worker\TaskException::__construct() 
must be of the type integer, string given, 
called in .../vendor/amphp/parallel/lib/Worker/Internal/TaskFailure.php on line 54

This happens because PDOException::getCode() can return a string.

Fork alternative on 0.2?

Since 0.2:

The forking context has been completely removed. Forking the entire process state proved to be too error-prone to be useful as a general-purpose parallel execution context.

What is the alternative to that if I want to keep using pcntl? I can't install pthread.

Regards.

Debug mode

Moving my question from box-project/box#53 (comment) to here.

I think it would be good if it would be possible to easily switch from parallelMap() to a plain old array_map(), be it for debugging purposes or simply because in a special case parallelMap() is not working e.g. when the passed argument is not serializable.

Maybe an env variable would work, the inconvenient being it's gonna rely on global state, the benefit is would be easier and won't require much changes.

On the same token, the major downside I have with this library is the inability to use break points for the paralellised work. Would make it work?

If you have some insight regarding this that would be greatly appreciated, otherwise I'll try to give it a stab in the coming months :)

How to cancel task by timeout

Hi. How can i cancel task by timeout ?
Currently i use this way:

			$worker = $this->pool->getWorker();
			$promise = $worker->enqueue(new Executor($job));
			$token = new TimeoutCancellationToken(60000);
			$token->subscribe([$worker, 'cancel']);

Will it work as exoected ?

Worker kill not worked

Hi. I wrote simple test code:

		$task = new \application\Test();
		\Amp\Loop::repeat(100, function($reference) use($task){
			\Amp\Loop::disable($reference);
			$pool = \Amp\Parallel\Worker\pool();
			echo "Get worker";
			$worker = $pool->getWorker();
			try {
				echo 'enqueue task';
				$promise = $worker->enqueue($task);
				$canceller = new \Amp\TimeoutCancellationToken(1000);
				$id = $canceller->subscribe(function() use($worker){
					echo "Killing";
					$worker->kill();
				});
				echo "Waiting";
				$result = yield $promise;
				echo "done";
				$canceller->unsubscribe($id);
			} catch(Throwable $exception) {
				echo $exception->getMessage();
			}
			echo 'Result is:' . $result;
			if(!$worker->isRunning()) {
				echo "Worker not running";
			}
			\Amp\Loop::enable($reference);
		});
		\Amp\Loop::run();

and Test is:

class Test implements Task
{
	/**
	 * @inheritDoc
	 */
	public function run(Environment $environment)
	{
		sleep(30);
		return 'ok';
	}

}

I expect what worker will be killed after 1 second and "enqueue" promise will fail. But instead of that promise succeeds after 30 seconds.

Parallel call on user defined function

Is there any way to call user defined function in parallel .

<?php
require \dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Parallel\Worker;
use Amp\Promise;
$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];
$promises = [];
foreach ($urls as $url) {
    $promises[$url] = Worker\enqueueCallable('file_get_contents', $url);
}
$responses = Promise\wait(Promise\all($promises));
foreach ($responses as $url => $response) {
    \printf("Read %d bytes from %s\n", \strlen($response), $url);
}

Like below

<?php

require 'vendor/autoload.php';

use Amp\Parallel\Worker;
use Amp\Promise;
$urls = [
    'https://secure.php.net',
    'https://amphp.org',
    'https://github.com',
];

$promises = [];
foreach ($urls as $url) {

    $promises[$url] = Worker\enqueueCallable('call_user_func','_subFunction');
}

$responses = Promise\wait(Promise\all($promises));

foreach ($responses as $url => $response) {
    // Cumulated response
}

function _subFunction()
{
	// User defined function
}

Remove current changelog

The current changelog seems to be still from Icicle and is now intermixed, probably because search & replace.

Logging

It's worth mentioning I'm mostly using parallelMap rather than parallel directly and this may be purely a doc issue.

My main "complain" and by that I mean pain to check, monitor or debug what's going on when using Amp is that there is absolutely nothing to see. I see a list of workers with their CPU usages, and that's it. There is no logs about how many workers, which command is used to start them, what is their workload or status.

Did I miss something or this is completely missing? If so it would be really interesting to investigate how this could be done. I really like for example how XdebugHandler does it: just plug your logger there and you have everything you need

Multiple errors encountered {"exception":"[object] (Amp\\MultiReasonException(code: 0)

I don't know what's the problem. This is the full log:

[2019-02-25 16:07:17] local.ERROR: Multiple errors encountered {"exception":"[object] (Amp\\MultiReasonException(code: 0): Multiple errors encountered at /var/www/vendor/amphp/parallel-functions/src/functions.php:56)
[stacktrace]
#0 [internal function]: Amp\\ParallelFunctions\\{closure}()
#1 /var/www/vendor/amphp/amp/lib/Coroutine.php(76): Generator->send(Array)
#2 /var/www/vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\\Coroutine->Amp\\{closure}(NULL, Array)
#3 /var/www/vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve(Array)
#4 /var/www/vendor/amphp/amp/lib/functions.php(466): Amp\\Deferred->resolve(Array)
#5 /var/www/vendor/amphp/amp/lib/Internal/ResolutionQueue.php(55): Amp\\Promise\\{closure}(Object(Amp\\Parallel\\Worker\\TaskError), NULL)
#6 /var/www/vendor/amphp/amp/lib/Failure.php(29): Amp\\Internal\\ResolutionQueue->__invoke(Object(Amp\\Parallel\\Worker\\TaskError), NULL)
#7 /var/www/vendor/amphp/amp/lib/Internal/Placeholder.php(125): Amp\\Failure->onResolve(Object(Amp\\Internal\\ResolutionQueue))
#8 /var/www/vendor/amphp/amp/lib/Coroutine.php(81): Amp\\Coroutine->resolve(Object(Amp\\Failure))
#9 /var/www/vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\\Coroutine->Amp\\{closure}(NULL, Object(Amp\\Parallel\\Worker\\Internal\\TaskFailure))
#10 /var/www/vendor/amphp/amp/lib/Coroutine.php(81): Amp\\Coroutine->resolve(Object(Amp\\Parallel\\Worker\\Internal\\TaskFailure))
#11 /var/www/vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\\Coroutine->Amp\\{closure}(NULL, Object(Amp\\Parallel\\Worker\\Internal\\TaskFailure))
#12 /var/www/vendor/amphp/amp/lib/Coroutine.php(81): Amp\\Coroutine->resolve(Object(Amp\\Parallel\\Worker\\Internal\\TaskFailure))
#13 /var/www/vendor/amphp/amp/lib/Internal/Placeholder.php(130): Amp\\Coroutine->Amp\\{closure}(NULL, '\\x00\\x03\\x13\\x00\\x00O:40:\"Amp\\\\...')
#14 /var/www/vendor/amphp/amp/lib/Deferred.php(45): class@anonymous->resolve('\\x00\\x03\\x13\\x00\\x00O:40:\"Amp\\\\...')
#15 /var/www/vendor/amphp/byte-stream/lib/ResourceInputStream.php(99): Amp\\Deferred->resolve('\\x00\\x03\\x13\\x00\\x00O:40:\"Amp\\\\...')
#16 /var/www/vendor/amphp/amp/lib/Loop/NativeDriver.php(206): Amp\\ByteStream\\ResourceInputStream::Amp\\ByteStream\\{closure}('bg', Resource id #546, NULL)
#17 /var/www/vendor/amphp/amp/lib/Loop/NativeDriver.php(97): Amp\\Loop\\NativeDriver->selectStreams(Array, Array, -0.001)
#18 /var/www/vendor/amphp/amp/lib/Loop/Driver.php(134): Amp\\Loop\\NativeDriver->dispatch(true)
#19 /var/www/vendor/amphp/amp/lib/Loop/Driver.php(72): Amp\\Loop\\Driver->tick()
#20 /var/www/vendor/amphp/amp/lib/Loop.php(84): Amp\\Loop\\Driver->run()
#21 /var/www/vendor/amphp/amp/lib/functions.php(170): Amp\\Loop::run(Object(Closure))
#22 /var/www/vendor/spatie/laravel-collection-macros/src/macros/parallelMap.php(31): Amp\\Promise\\wait(Object(Amp\\Coroutine))
#23 [internal function]: Illuminate\\Database\\Eloquent\\Collection->{closure}(Object(Closure))
#24 /var/www/vendor/laravel/framework/src/Illuminate/Support/Traits/Macroable.php(107): call_user_func_array(Object(Closure), Array)
#25 /var/www/app/Http/Controllers/EdgesController.php(204): Illuminate\\Support\\Collection->__call('parallelMap', Array)
#26 [internal function]: App\\Http\\Controllers\\EdgesController->path(Object(App\\Http\\Requests\\PathRequest))
#27 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Controller.php(54): call_user_func_array(Array, Array)
#28 /var/www/vendor/laravel/framework/src/Illuminate/Routing/ControllerDispatcher.php(45): Illuminate\\Routing\\Controller->callAction('path', Array)
#29 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Route.php(212): Illuminate\\Routing\\ControllerDispatcher->dispatch(Object(Illuminate\\Routing\\Route), Object(App\\Http\\Controllers\\EdgesController), 'path')
#30 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Route.php(169): Illuminate\\Routing\\Route->runController()
#31 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Router.php(665): Illuminate\\Routing\\Route->run()
#32 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(30): Illuminate\\Routing\\Router->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#33 /var/www/vendor/barryvdh/laravel-cors/src/HandleCors.php(36): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#34 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Barryvdh\\Cors\\HandleCors->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#35 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#36 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Middleware/SubstituteBindings.php(41): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#37 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Illuminate\\Routing\\Middleware\\SubstituteBindings->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#38 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#39 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(104): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#40 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Router.php(667): Illuminate\\Pipeline\\Pipeline->then(Object(Closure))
#41 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Router.php(642): Illuminate\\Routing\\Router->runRouteWithinStack(Object(Illuminate\\Routing\\Route), Object(Illuminate\\Http\\Request))
#42 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Router.php(608): Illuminate\\Routing\\Router->runRoute(Object(Illuminate\\Http\\Request), Object(Illuminate\\Routing\\Route))
#43 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Router.php(597): Illuminate\\Routing\\Router->dispatchToRoute(Object(Illuminate\\Http\\Request))
#44 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Kernel.php(176): Illuminate\\Routing\\Router->dispatch(Object(Illuminate\\Http\\Request))
#45 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(30): Illuminate\\Foundation\\Http\\Kernel->Illuminate\\Foundation\\Http\\{closure}(Object(Illuminate\\Http\\Request))
#46 /var/www/vendor/fideloper/proxy/src/TrustProxies.php(57): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#47 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Fideloper\\Proxy\\TrustProxies->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#48 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#49 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Middleware/TransformsRequest.php(31): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#50 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Illuminate\\Foundation\\Http\\Middleware\\TransformsRequest->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#51 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#52 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Middleware/TransformsRequest.php(31): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#53 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Illuminate\\Foundation\\Http\\Middleware\\TransformsRequest->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#54 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#55 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Middleware/ValidatePostSize.php(27): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#56 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Illuminate\\Foundation\\Http\\Middleware\\ValidatePostSize->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#57 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#58 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Middleware/CheckForMaintenanceMode.php(62): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#59 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Illuminate\\Foundation\\Http\\Middleware\\CheckForMaintenanceMode->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#60 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#61 /var/www/vendor/barryvdh/laravel-cors/src/HandlePreflight.php(29): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#62 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(151): Barryvdh\\Cors\\HandlePreflight->handle(Object(Illuminate\\Http\\Request), Object(Closure))
#63 /var/www/vendor/laravel/framework/src/Illuminate/Routing/Pipeline.php(53): Illuminate\\Pipeline\\Pipeline->Illuminate\\Pipeline\\{closure}(Object(Illuminate\\Http\\Request))
#64 /var/www/vendor/laravel/framework/src/Illuminate/Pipeline/Pipeline.php(104): Illuminate\\Routing\\Pipeline->Illuminate\\Routing\\{closure}(Object(Illuminate\\Http\\Request))
#65 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Kernel.php(151): Illuminate\\Pipeline\\Pipeline->then(Object(Closure))
#66 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Http/Kernel.php(116): Illuminate\\Foundation\\Http\\Kernel->sendRequestThroughRouter(Object(Illuminate\\Http\\Request))
#67 /var/www/public/index.php(55): Illuminate\\Foundation\\Http\\Kernel->handle(Object(Illuminate\\Http\\Request))
#68 {main}
"} 

Error using $output from Symfony Command

Hi, thanks for your effort and sharing this library.

I have optimized some of our Symfony CLI apps with this parallels extension. So far all works, but I can't use the Symfony Output anymore.

  

Simple example for a task:

<?php

use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;

class MyTask implements Task {

  private $output;

  public function __construct($output) {
    $this->output = $output;
    $this->output->writeln("OUTPUT INSIDE CONSTRUCT WORKS");
  }

  public function run(Environment $environment) {
    echo "THIS WORKS\n";
    $this->output->writeln("OUTPUT INSIDE RUN BREAKS");
    $sleep = rand(1, 3);
    sleep($sleep); // Blocking call in thread.
    return 0
  }

}

  

And my Symfony Command looks like (I tried to simplify as much as possible):

<?php

use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Formatter\OutputFormatterStyle;

use Amp\Loop;
use Amp\Parallel\Worker\DefaultPool;

use MyTask;

class MyCommand extends Command {

  protected function configure(){
    $this->setName("Process:Amp")
         ->setDescription("This will run some parallel processes.")
         ->addArgument('count', InputArgument::REQUIRED, 'Number of concurrent pool processes.');
  }

  protected function execute(InputInterface $input, OutputInterface $output){

    $output->writeln('START COMMAND ...');

    // We can first define tasks and then run them
    $tasks = [
      new MyTask($output),
      new MyTask($output),
      new MyTask($output),
      new MyTask($output),
      new MyTask($output),
    ];

    try {

      // get parameters
      $cnt_parallels = $input->getArgument('count');

      $pool = new DefaultPool($cnt_parallels);

      foreach ($tasks as $task) {
        $pool->enqueue($task);
      }

      // Event loop for parallel tasks
      Loop::run(function () use (&$pool) {
        return $pool->shutdown();
      });

    }
    finally {

      $output->writeln('WELL, COMMAND Finally reached!');

    }

  }

}

  

The above will immediately stop but not shown any exception.

If changing the loop code with Coroutines then following exception is shown and formatted via Symfony:

In TaskFailure.php line 55:

Uncaught Symfony\Component\Console\Exception\RuntimeException in worker with message "Unable to write output." and code "0"

  

Thanks for some hints or solution in advance
Cheers, Tom

BasicEnvironment leaks watcher

This is due to a missing __destruct + the cyclic reference in the loop watcher. I have fixed this in my local ext-async branch, but not in master yet.

Fails on Windows if path to this lib contains non-ascii chars

On fresh git clone && composer install:

Správca@GADELAT-PC C:\
$ php parallel-functions\examples\1-simple-function.php
Took 3.409206867218 milliseconds.

Správca@GADELAT-PC C:\
$ mv parallel-functions á

Správca@GADELAT-PC C:\
$ php á\examples\1-simple-function.php
PHP Fatal error:  Uncaught Amp\MultiReasonException: Multiple errors encountered in C:\á\src\functions.php:59
Stack trace:
#0 [internal function]: Amp\ParallelFunctions\{closure}()
#1 C:\á\vendor\amphp\amp\lib\Coroutine.php(74): Generator->send(Array)
#2 C:\á\vendor\amphp\amp\lib\Internal\Placeholder.php(127): Amp\Coroutine->Amp\{closure}(NULL, Array)
#3 C:\á\vendor\amphp\amp\lib\Deferred.php(41): class@anonymous->resolve(Array)
#4 C:\á\vendor\amphp\amp\lib\functions.php(415): Amp\Deferred->resolve(Array)
#5 C:\á\vendor\amphp\amp\lib\Internal\ResolutionQueue.php(51): Amp\Promise\{closure}(Object(Amp\Parallel\Context\ContextException), NULL)
#6 C:\á\vendor\amphp\amp\lib\Failure.php(26): Amp\Internal\ResolutionQueue->__invoke(Object(Amp\Parallel\Context\ContextException), NULL)
#7 C:\á\vendor\amphp\amp\lib\Internal\Placeholder.php(122): Amp\Failure->onResolve(Object(Amp\Internal\ResolutionQueue))
#8 C:\á\vendor\amphp\amp\lib\Internal\Placeholder.php(151): Amp\Coroutine->resolve(Object(Amp\Failure))
#9 C:\á\vendor\amphp in C:\á\src\functions.php on line 59

Fatal error: Uncaught Amp\MultiReasonException: Multiple errors encountered in C:\á\src\functions.php on line 59

Amp\MultiReasonException: Multiple errors encountered in C:\á\src\functions.php on line 59

Call Stack:
    0.2040     381376   1. {main}() C:\á\examples\1-simple-function.php:0
    0.2550    1547464   2. Amp\Promise\wait() C:\á\examples\1-simple-function.php:14


Správca@GADELAT-PC C:\

I streamlined bug to this library thanks to different error message I receive in other project where I use AMP

In Process.php line 177:

  [Amp\Parallel\Context\ContextException]
  The context stopped responding, potentially due to a fatal error or calling exit


Exception trace:
 Amp\Parallel\Context\Process->Amp\Parallel\Context\{closure}() at n/a:n/a
 Generator->throw() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Coroutine.php:71
 Amp\Coroutine->Amp\{closure}() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Failure.php:26
 Amp\Failure->onResolve() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Internal\Placeholder.php:122
 Amp\Coroutine->resolve() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Internal\Placeholder.php:151
 Amp\Coroutine->fail() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Coroutine.php:93
 Amp\Coroutine->Amp\{closure}() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Internal\Placeholder.php:127
 class@anonymous\C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Deferred.php004402D3->resolve() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Deferred.php:41
 Amp\Deferred->resolve() at C:\Users\Správca\src\php\Project\vendor\amphp\byte-stream\lib\ResourceInputStream.php:72
 Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\NativeDriver.php:172
 Amp\Loop\NativeDriver->selectStreams() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\NativeDriver.php:68
 Amp\Loop\NativeDriver->dispatch() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\Driver.php:130
 Amp\Loop\Driver->tick() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\Driver.php:70
 Amp\Loop\Driver->run() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop.php:76
 Amp\Loop::run() at C:\Users\Správca\src\php\Project\bin\project:53
 Closure->{closure}() at n/a:n/a
 call_user_func() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Command\Command.php:250
 Symfony\Component\Console\Command\Command->run() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Application.php:865
 Symfony\Component\Console\Application->doRunCommand() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Application.php:241
 Symfony\Component\Console\Application->doRun() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Application.php:143
 Symfony\Component\Console\Application->run() at C:\Users\Správca\src\php\Project\bin\project:59

In ChannelParser.php line 61:

  [Amp\Parallel\Sync\ChannelException]
  Invalid packet received: Could not open input file: C:\Users\Spr\xc4\x82\xcb\x87vca\src\php\Project\vendor\amphp\parallel\lib\Context/Internal/proces
  s-runner.php\xa


Exception trace:
 Amp\Parallel\Sync\ChannelParser::parser() at n/a:n/a
 Generator->send() at C:\Users\Správca\src\php\Project\vendor\amphp\parser\lib\Parser.php:102
 Amp\Parser\Parser->push() at C:\Users\Správca\src\php\Project\vendor\amphp\parallel\lib\Sync\ChannelledStream.php:71
 Amp\Parallel\Sync\ChannelledStream->Amp\Parallel\Sync\{closure}() at n/a:n/a
 Generator->send() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Coroutine.php:74
 Amp\Coroutine->Amp\{closure}() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Internal\Placeholder.php:127
 class@anonymous\C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Deferred.php004402D3->resolve() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Deferred.php:41
 Amp\Deferred->resolve() at C:\Users\Správca\src\php\Project\vendor\amphp\byte-stream\lib\ResourceInputStream.php:72
 Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\NativeDriver.php:172
 Amp\Loop\NativeDriver->selectStreams() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\NativeDriver.php:68
 Amp\Loop\NativeDriver->dispatch() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\Driver.php:130
 Amp\Loop\Driver->tick() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop\Driver.php:70
 Amp\Loop\Driver->run() at C:\Users\Správca\src\php\Project\vendor\amphp\amp\lib\Loop.php:76
 Amp\Loop::run() at C:\Users\Správca\src\php\Project\bin\project:53
 Closure->{closure}() at n/a:n/a
 call_user_func() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Command\Command.php:250
 Symfony\Component\Console\Command\Command->run() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Application.php:865
 Symfony\Component\Console\Application->doRunCommand() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Application.php:241
 Symfony\Component\Console\Application->doRun() at C:\Users\Správca\Downloads\symfony\src\Symfony\Component\Console\Application.php:143
 Symfony\Component\Console\Application->run() at C:\Users\Správca\src\php\Project\bin\project:59

Tested on Windows 7

Hi guys

A little documentation or examples of usage could be help full.

Don't forward exception codes

This is a tracking issue for v0.2, so we don't forget about it. Exception codes shouldn't be passed to the new exception's constructor, but only be in the previous exception.

See #19.

*** Error in `php': malloc(): 0x000055c853bafde0 *** and e.t.c

Hello. When I use code:

for ($i = 0; $i < 1000; $i++) {
    Loop::run(function ()  {
        $threads = [];
        for ($i = 0; $i < 100; $i++) {

            $threads[] = Thread::spawn(function () {
                return rand(1,100);
            });
        }

        foreach ($threads as $thread) {
            $thread->join();
        }
    });
    echo $i . "\n";
}

Everytime I'v get system error ex:

*** Error in `php': malloc(): memory corruption (fast): 0x0000000002209ee2 ***
Aborted (core dumped)

or

Segmentation fault (core dumped)

or

*** Error in `php': free(): invalid pointer: 0x00005579a7f2157d ***

or

*** Error in `php': corrupted double-linked list: 0x000055ca863cc7e0 ***

This can happen after the third iteration and can be after 1000 iterations. I can not find the relationship between the number of iterations and the receipt of this error.

Could you tell me how can I avoid that problem?

Not working inside a PHAR

After debugging this for a while I couldn't find any solution or pinpoint the real issue so attempting it here...

Basically for Box, when you rename it's PHAR from box.phar to box, i.e. renaming it to a name without the .phar extension, it stops being usable with Amp Parallel.

Indeed when Amp spawns Workers, it seems to be requiring the Composer Autoloader but that's it. In the case of Box as it needs more in the spawners, I'm registering a bootstrap closure which I call in the worker.

Now there is two issues when the PHAR is renamed to box:

  • It cannot find the Composer autoloader => actually it's not an issue as Amp already does it, so even if it did it would end up in a fatal error because trying to declare again the Composer classes. So the fix here is to remove it as done in https://github.com/humbug/box/pull/160/files
  • It cannot find the other function class vendor/humbug/php-scoper/src/functions.php

That's where I'm not sure what Amp does with the temporary files to try to execute the Workers when it's inside a PHAR. But it seems this part is not execute from inside the PHAR so I can't use the phar://box-alias.phar/vendor/humbug/php-scoper/src/functions.php to point to functions.php (the Box PHAR does not have an alias yet but I tried with it).

Any idea?

Development tags

Would there be any chance this library (and its amp dependencies) could be tagged following semver guidelines? The only way I can safely add this library without worrying about BC breaking changes is to manually specify a commit, and an minimum-stability of dev.

    "minimum-stability": "dev",
    "prefer-stable": true,
    "require" : {
        "amphp/loop": "dev-master#302a5c714c2700d32ca9444be09f21b2835b213f",
        "amphp/amp": "2.0.x-dev#72378e2b82d9f1dee6b93de2bee8aa138e7f67e1",
        "amphp/parallel": "0.1.x-dev#9cc70c1f86bf8ffa8ca744e40702502410b733f0",
        "async-interop/promise": "^0.4"
    }

This is the only configuration I could find which works for me and is safe. Any suggestions as to improve this would be welcome.

Class not found

I run codes like this:

use Amp\Parallel\Worker;
use Amp\Promise;
use common\services\internal\TigerApi;

        $work = Worker\enqueueCallable([TigerApi::class, 'getPoliciesTitle']);
        $promises = ['policy' => $work];
        $result = Promise\wait(Promise\all($promises));

The output: Uncaught Error in worker with message \"Class 'common\\services\\internal\\TigerApi' not found\" and code \"0\"

How can I add the class to autoload?
I used Yii2.0 and I tried the method: #55, but it appeared yii is not found.

I know that yii has it's owner autoload class, how can I let this package use the autoloader of yii?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.