Giter Site home page Giter Site logo

byte-stream's Introduction

amphp/byte-stream

AMPHP is a collection of event-driven libraries for PHP designed with fibers and concurrency in mind. amphp/byte-stream specifically provides a stream abstraction to ease working with various byte streams.

Installation

This package can be installed as a Composer dependency.

composer require amphp/byte-stream

Requirements

This package requires PHP 8.1 or later.

Usage

Streams are an abstraction over ordered sequences of bytes. This package provides the fundamental interfaces ReadableStream and WritableStream.

Note Previous versions used the terms InputStream and OutputStream, but these terms can be confusing depending on the use case.

ReadableStream

ReadableStream offers a primary method: read(). It returns a string or null. null indicates that the stream has ended.

The following example shows a ReadableStream consumption that buffers the complete stream contents.

$stream = ...;
$buffer = "";

while (($chunk = $stream->read()) !== null) {
    $buffer .= $chunk;
}

// do something with $buffer

Note Amp\ByteStream\buffer($stream) can be used instead, but we'd like to demonstrate manual consumption here.

This package offers some basic implementations, other libraries might provide even more implementations, such as amphp/socket.

Payload

Payload implements ReadableStream while also providing a buffer() method for buffering the entire contents. This allows consuming a message either in chunks (streaming) or consume everything at once (buffering). When the object is destructed, any remaining data in the stream is automatically consumed and discarded. This class is useful for small payloads or when the entire contents of a stream is needed before any processing can be done.

Buffering

Buffering a complete readable stream can be accomplished using the buffer() method.

$payload = new Payload($inputStream);
$content = $payload->buffer();

Streaming

Sometimes it's useful / possible to consume a payload in chunks rather than first buffering it completely, e.g. streaming a large HTTP response body directly to disk.

while (null !== $chunk = $payload->read()) {
    // Use $chunk here, works just like any other ReadableStream
}

ReadableBuffer

An ReadableBuffer allows creating a ReadableStream from a single known string chunk. This is helpful if the complete stream contents are already known.

$stream = new ReadableBuffer("foobar");

It also allows creating a stream without any chunks by passing null as chunk / omitting the constructor argument:

$stream = new ReadableBuffer;

// The stream ends immediately
assert(null === $stream->read());

ReadableIterableStream

ReadableIterableStream allows converting an iterable that yields strings into a ReadableStream:

$inputStream = new Amp\ByteStream\ReadableIterableStream((function () {
    for ($i = 0; $i < 10; $i++) {
        Amp\delay(1);
        yield $emit(".");
    }
})());

ReadableResourceStream

This package abstracts PHP's stream resources with ReadableResourceStream and WritableResourceStream. They automatically set the passed resource to non-blocking mode and allow reading and writing like any other ReadableStream / WritableStream. They also handle backpressure automatically by disabling the read watcher in case there's no read request and only activate a writability watcher if the underlying write buffer is already full, which makes them very efficient.

DecompressingReadableStream

This package implements compression based on Zlib. CompressingWritableStream can be used for compression, while DecompressingReadableStream can be used for decompression. Both can simply wrap an existing stream to apply them. Both accept an $encoding and $options parameter in their constructor.

$readableStream = new ReadableResourceStream(STDIN);
$decompressingReadableStream = new DecompressingReadableStream($readableStream, \ZLIB_ENCODING_GZIP);

while (null !== $chunk = $decompressingReadableStream) {
    print $chunk;
}

See also: ./examples/gzip-decompress.php

WritableStream

WritableStream offers two primary methods: write() and end().

WritableStream::write

write() writes the given string to the stream. Waiting for completion allows writing only as fast as the underlying stream can write and potentially send over a network. TCP streams will return immediately as long as the write buffer isn't full.

The writing order is always ensured, even if the writer doesn't wait for completion before issuing another write.

WritableStream::end

end() marks the stream as ended. TCP streams might close the underlying stream for writing, but MUST NOT close it. Instead, all resources should be freed and actual resource handles be closed by PHP's garbage collection process.

The following example uses the previous example to read from a stream and writes all data to a WritableStream:

$readableStream = ...;
$writableStream = ...;
$buffer = "";

while (($chunk = $readableStream->read()) !== null) {
    $writableStream->write($chunk);
}

$writableStream->end();

Note Amp\ByteStream\pipe($readableStream, $writableStream) can be used instead, but we'd like to demonstrate manual consumption / writing here.

This package offers some basic implementations, other libraries might provide even more implementations, such as amphp/socket.

WritableResourceStream

This package abstracts PHP's stream resources with ReadableResourceStream and WritableResourceStream. They automatically set the passed resource to non-blocking mode and allow reading and writing like any other ReadableStream / WritableStream. They also handle backpressure automatically by disabling the read watcher in case there's no read request and only activate a writability watcher if the underlying write buffer is already full, which makes them very efficient.

CompressingWritableStream

This package implements compression based on Zlib. CompressingWritableStream can be used for compression, while DecompressingReadableStream can be used for decompression. Both can simply wrap an existing stream to apply them. Both accept an $encoding and $options parameter in their constructor.

$writableStream = new WritableResourceStream(STDOUT);
$compressedWritableStream = new CompressingWritableStream($writableStream, \ZLIB_ENCODING_GZIP);

for ($i = 0; $i < 100; $i++) {
    $compressedWritableStream->write(bin2hex(random_bytes(32));
}

$compressedWritableStream->end();

See also: ./examples/gzip-compress.php

Versioning

amphp/byte-stream follows the semver semantic versioning specification like all other amphp packages.

Security

If you discover any security related issues, please email [email protected] instead of using the issue tracker.

License

The MIT License (MIT). Please see LICENSE for more information.

byte-stream's People

Contributors

azjezz avatar bwoebi avatar danog avatar iggyvolz avatar ivanpepelko avatar kelunik avatar nevay avatar ostrolucky avatar sebdesign avatar spomky avatar thgs avatar trowski avatar userqq avatar villfa avatar xtrime-ru avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

byte-stream's Issues

Ability to read particular length from stream

Hi.
I think it's would be great to have possibility to read chunks with specified length since commonly dealing with binary protocols looks like

$lengthBytes = fread($fd, 4); // read Int32 Length
$length = unpack('Llength', $lengthBytes)['length'];
$packet = fread($fd, $length);

Not sure about possibility to make InputStream behave like public function read(?int $length = null): Promise;, but maybe some buffering class similar to LineReader would be useful, for example:

class LengthReader
{
    public function __construct(InputStream $inputStream);

    public function readLine(int $length): Promise;
}

Invalid packet received since update to byte-stream 1.2.4

I've just released my application built on amphp at https://github.com/ostrolucky/stdinho Just before announcing it to world, I updated my dependencies. And it no longer works. Latest amphp/byte-stream it works with reliably is 1.2.2. 1.2.3 blocks in a way socket isn't created, 1.2.4 crashes with following error.

In ChannelParser.php line 61:
                                                                               
  [Amp\Parallel\Sync\ChannelException]                                         
  Invalid packet received: \xce\xb3\x1a\x1b\x1\xe1\xff\x9d\xf8]D\xfb\xbc\xf4\  
  xa8\xb6\xd8\xd6oH\x18\xff\x9a\xa\x98\xb3\xc9\x5t \x7f\x4\xed\xf\xa8\xbf\xc3  
  ?\x86{\xae\xd1\xd5\xbep\xa\xcc\xcbEIM;\xe9\x0\x95\xfb\x86\xebg\x3RwZ\x13\xa  
  6)Q|\x82y0\xa9\x86`\xc0\xd\x13\x82\xda\x99d\xcb\x9af\x9b\x81\xf1\xch\xd1\xa  
  e?\x3\xd0\x94&\\x1dVY\x3\xe1\xfc\xecK\x1dow\x15\xa1\x8a\x89n\xbc\x95t\xa6\x  
  f50\xf5";s:43:"\x0Amp\Parallel\Worker\Internal\TaskResult\x0id";s:2:"km";}   
                                                                               

Exception trace:
 Amp\Parallel\Sync\ChannelParser::parser() at n/a:n/a
 Generator->send() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/parser/lib/Parser.php:102
 Amp\Parser\Parser->push() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/parallel/lib/Sync/ChannelledStream.php:71
 Amp\Parallel\Sync\ChannelledStream->Amp\Parallel\Sync\{closure}() at n/a:n/a
 Generator->send() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Coroutine.php:74
 Amp\Coroutine->Amp\{closure}() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Internal/Placeholder.php:127
 class@anonymous\/media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Deferred.php0x7f2c357fa2d3->resolve() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Deferred.php:41
 Amp\Deferred->resolve() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/byte-stream/lib/ResourceInputStream.php:90
 Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/NativeDriver.php:172
 Amp\Loop\NativeDriver->selectStreams() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/NativeDriver.php:68
 Amp\Loop\NativeDriver->dispatch() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/Driver.php:130
 Amp\Loop\Driver->tick() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop/Driver.php:70
 Amp\Loop\Driver->run() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/amphp/amp/lib/Loop.php:76
 Amp\Loop::run() at /media/gadelat/sdata/GDrive/src/php/stdinho/bin/stdinho:50
 Closure->{closure}() at n/a:n/a
 call_user_func() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Command/Command.php:250
 Symfony\Component\Console\Command\Command->run() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Application.php:865
 Symfony\Component\Console\Application->doRunCommand() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Application.php:241
 Symfony\Component\Console\Application->doRun() at /media/gadelat/sdata/GDrive/src/php/stdinho/vendor/symfony/console/Application.php:143
 Symfony\Component\Console\Application->run() at /media/gadelat/sdata/GDrive/src/php/stdinho/bin/stdinho:56

Codebase is simple, please clone it and try it. I have no idea where could be a problem. It can be reproduced by running following:

#terminal 1
$ head -c 1G </dev/urandom | bin/stdinho 127.0.0.1:1337 -v
#terminal 2
$ curl localhost:1337 > /dev/null

Error in ResourceOutputStream, fwrite

After last commit, I get strange errors:

Amp\ByteStream\StreamException: Failed to write to socket Errno: 2; stream_socket_accept(): accept failed: Connection timed out

and

Amp\ByteStream\StreamException: Failed to write to socket Errno: 2; inet_pton(): Unrecognized address 127.0.0.1:11001

after some research i find that problems are from this:
https://github.com/amphp/byte-stream/blob/master/lib/ResourceOutputStream.php#L71

if I call it without $chunkSize all is ok!

is this problem from PHP ?
I am using this:
PHP 7.1.3 (cli) (built: Mar 17 2017 13:43:06) ( NTS )

Only I get this bug ?
Can someone confirm this ?

InputStream throttling

Typically I read from stream like this while (($data = yield $stream->read()) !== null) and if I have readed null stream is closed.

How can I know/find that stream is closed without read from it ?
How can I throttle stream ?

P.S: I want to implement something like pause(), resume() that will permit throttling, very useful for testing

Make read consistent

Reading after the stream has closed should always resolve to null instead of throwing an exception.

feof may hang

I encountered hanging feof resulting in StreamException("The stream was closed by the peer").
This happens when doing requests in Kubernetes to Facebook.

Omitting the feof check "fixes" it.

A workaround (and better code) would be to not check on @feof before the write but setting an error handler for the fwrite and cathing warnings and converting them to stream exceptions.

Example:

                    $eh = set_error_handler([$this, 'onSocketWriteError']);
                    
                    if ($chunkSize) {
                        $written = \fwrite($stream, $data, $chunkSize);
                    } else {
                        $written = \fwrite($stream, $data);
                    }

                    set_error_handler($eh);

As soon as ResourceInputStream detects EOF, it ignores subsequent requests to read the resource

Since a4739c8, following hangs with no output

<?php

use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;

require __DIR__ . "/../vendor/autoload.php";

$writeCoRoutine2 = \Amp\asyncCoroutine(function() {
    $middleReadStream = new ResourceInputStream(fopen('middle', 'r'));

    while (true) {
        echo yield $middleReadStream->read();
    }
});

Loop::run(function () use ($writeCoRoutine2) {
    \Amp\ByteStream\pipe(
        new ResourceInputStream(fopen('/home/g9735/Downloads/somefile.avi', 'r')),
        new ResourceOutputStream(fopen('middle', 'w'))
    );

    $writeCoRoutine2();
});

as soon as read() in $writeCoRoutine2 is executed, event loop will never return to pipe

Message hides exceptions

Exceptions are hidden inside Message when streaming, they're fine when using the Promise API.

Consider avoiding stream_socket_shutdown

When file descriptors are shared between processes or threads, stream_socket_shutdown will close the file descriptor for ALL threads/processes (took me forever to figure out what was going on) and cause broken pipes.

ResourceOutputStream treats open stream as closed

Here is a problem which occurs if the script is run from systemd

Amp\Loop::run(function() {
    $resourceOutputStream = new ByteStream\ResourceOutputStream(\STDOUT);
    // Throws Amp\ByteStream\StreamException: The stream was closed by the peer
    yield $resourceOutputStream->write('test');
});

I figured out that it stops happening if feof check of the stream is removed in ResourceOutputStream.php:74. Apparently it is not correct to use feof for checking if stream is open or not, but I am not sure what can be done here as it has been added for a reason.

InvalidWatcherError in ResourceOutputStream

I'm getting the following exception while having a look at amphp/socket#32:

PHP Fatal error:  Uncaught Amp\Loop\InvalidWatcherError: Cannot enable an invalid watcher identifier: 'byb' in /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/Driver.php:369
Stack trace:
#0 /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop.php(210): Amp\Loop\Driver->enable('byb')
#1 /home/kelunik/GitHub/amphp/socket/vendor/amphp/byte-stream/lib/ResourceOutputStream.php(188): Amp\Loop::enable('byb')
#2 /home/kelunik/GitHub/amphp/socket/vendor/amphp/byte-stream/lib/ResourceOutputStream.php(130): Amp\ByteStream\ResourceOutputStream->send('test', false)
#3 /home/kelunik/GitHub/amphp/socket/lib/Socket.php(67): Amp\ByteStream\ResourceOutputStream->write('test')
#4 /home/kelunik/GitHub/amphp/socket/test.php(35): Amp\Socket\Socket->write('test')
#5 /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/NativeDriver.php(101): {closure}('byc', NULL)
#6 /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/Driver.php(130): Amp\Loop\NativeDriver->dispatch(true)
#7 /home/kelunik/GitHub/amphp/socket/vendor/a in /home/kelunik/GitHub/amphp/socket/vendor/amphp/amp/lib/Loop/Driver.php on line 369

server.php

<?php

require __DIR__ . "/vendor/autoload.php";

use Amp\Socket\ServerSocket;
use function Amp\asyncCoroutine;

\Amp\Loop::run(function () {
    \Amp\Loop::repeat(5 * 1000, function () {
        echo "current RAM: " . round((memory_get_usage() / 1024 / 1024), 2) . "MB, pick RAM: " . round((memory_get_peak_usage() / 1024 / 1024), 2) . "MB\n";
    });

    $clientHandler = asyncCoroutine(function (ServerSocket $client) {
        echo "client connected" . PHP_EOL;

        while (($chunk = yield $client->read()) !== null) {
            echo "client data -> " . strlen($chunk) . PHP_EOL;
        }

        echo "client disonnected" . PHP_EOL;
    });

    $server = \Amp\Socket\listen("127.0.0.1:12001");

    while ($client = yield $server->accept()) {
        // $clientHandler($client);
    }
});

client.php

<?php

require __DIR__ . "/vendor/autoload.php";

\Amp\Loop::run(function () {
    echo "Driver -> " . get_class(\Amp\Loop::get()) . "\n";

    \Amp\Loop::repeat(5 * 1000, function () {
        echo "current RAM: " . round((memory_get_usage() / 1024 / 1024), 2) . "MB, pick RAM: " . round((memory_get_peak_usage() / 1024 / 1024), 2) . "MB\n";
    });

    for ($i = 0; $i < 1000; $i++) {
        client($i);
    }
});

/**
 * @param $i
 *
 * @return \Amp\Promise<\Amp\Socket\ClientSocket>
 */
function client($i)
{
    return \Amp\call(function () use ($i) {
        /**
         * @var $client \Amp\Socket\ClientSocket
         */
        $client = yield \Amp\Socket\connect("127.0.0.1:12001");

        \Amp\Loop::repeat(1000, function () use ($client, $i) {
            $data = "test";

            echo "write $i data -> " . strlen($data) . PHP_EOL;

            $client->write($data);
        });

        return $client;
    });
}

Write on full buffer

From docs:

  1. Streams are essentially “always” writable. The only time they aren’t is when their respective write buffers are full.

If I understand correct: this code is used when buffer are NOT full and if buffers are full this code is used to schedule the writes, and this code are writing them

  • How can I simulate such a situation ? I am really curious to test this behavior.
  • What are the advantages to use such an implementation ? Why not just loop fwrite()

Low bandwidth v2.0.0-beta.13, v2.0.0-beta.14

I use Ridge V2 for RabbitMQ.
When I upgrade the package from version 12 to version v2.0.0-beta.14, message processing on the production server is greatly reduced (see point 1 in the figure). When I return version v2.0.0-beta.12 (see point 2 in the figure), the processing speed returns to the required value.
Снимок экрана 2022-12-28 в 16 12 56

100% CPU with PHP built-in streams

php://temp and php://memory streams result in 100% CPU usage. It's probably due to them not being actual streams that can be used with stream_select, so there should probably be a test for that.

https://gist.github.com/umbri/2a3e558d487b5e6756768547e660f95a

<?php

use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;

require_once __DIR__ . "/../vendor/autoload.php";

Loop::run(function () {
    $memory = fopen("php://temp", "r+");

    $stdin = new ResourceInputStream($memory);
    $stdout = new ResourceOutputStream($memory);

    Loop::delay(1000, function () use ($stdout) {
        echo "write" . PHP_EOL;

        $stdout->write("Hello, World!");
    });

    while (($chunk = yield $stdin->read()) !== null) {
        echo "data -> $chunk" . PHP_EOL;
    }
});

ResourceOutputStream interrupts writing without consumer giving information what was written/unwritten

I'm dealing with situation that following line

throw new StreamException($message);
throws exception, but doesn't expose information what was written/what is left to write, which makes it very difficult to handle such situation in case of incomplete chunk write. My aim is handling gracefully such exception and re-writing this data to different stream.

I suggest to create custom exception which carries $data it was not succeeded to write.

InputStreamChain

I needed to combine output of two input streams while not leaking null from previous stream. I abandoned this idea after all, but code is already written and wonder if you would be interested in me to contribute it here? Usage is like following.

public function testInputStreamChain()
{
    $chain = new InputStreamChain(new InMemoryStream('foo'), new InMemoryStream(null), new InMemoryStream('bar'));

    self::assertEquals('foo', wait($chain->read()));
    self::assertEquals(null, wait($chain->read()));
    self::assertEquals('bar', wait($chain->read()));
}

ReadableStream: Clear data from buffer on read

I am using the ReadableIterableStream and it is awesome, but I am reading a very big list of large things. I am reading as quickly as the data comes in, but I'm still getting errors when I exceed the buffer size.

I can make the buffer bigger (and bigger and bigger -- 300MB now but will likely exceed 1GB) but chewing up all that ram for data I've already read is a waste and seems to impair performance.

ConcurrentQueueIterator could delete data as it's read pretty easily, it looks like to me; FiberLocal supports that operation already. There's nothing in the call chain that suggests to me that this option is already available, however.

In the most common use cases, once data is read it is no longer needed in the buffer - it has been buffed. A way to prevent this buildup would be welcome.

Error in Amp\ByteStream\splitLines()

$cancellation is not passed to split(). Apparently, this is not how it should be.

/**
 * Splits the stream into lines.
 *
 * @return \Traversable<int, string>
 */
function splitLines(ReadableStream $source, ?Cancellation $cancellation = null): \Traversable
{
    foreach (split($source, "\n") as $line) {
        yield \rtrim($line, "\r");
    }
}

Invalid watcher issue in ResourceInputStream

The ResourceInputStream implementation is buggy, which results in InvalidWatcherErrors being thrown in amphp/http-client.

AMP_DEBUG_TRACE_WATCHERS=true ./vendor/bin/phpunit test/ClientHttpBinIntegrationTest.php
PHPUnit 8.4.1 by Sebastian Bergmann and contributors.

.................................E                                34 / 34 (100%)

Time: 31.25 seconds, Memory: 65.84 MB

There was 1 error:

1) Amp\Http\Client\ClientHttpBinIntegrationTest::testConcurrentSlowNetworkInterceptor
Amp\Loop\InvalidWatcherError: Cannot reference an invalid watcher identifier: 'o'

Creation Trace:
#0 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:183 Amp\Loop\TracingDriver->onReadable()
#1 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:102 Amp\Loop::onReadable()
#2 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:72 Amp\ByteStream\ResourceInputStream->__construct()
#3 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:40 Amp\Socket\ResourceSocket->__construct()
#4 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/DnsConnector.php:122 Amp\Socket\ResourceSocket::fromClientSocket()
#5 Amp\Socket\DnsConnector::Amp\Socket\{closure}()
#6 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:105 Generator->send()
#7 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:43 Amp\Coroutine->Amp\{closure}()
#8 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/PrivatePromise.php:23 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->onResolve()
#9 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:125 Amp\Internal\PrivatePromise->onResolve()
#10 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php:45 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->resolve()
#11 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/functions.php:233 Amp\Deferred->resolve()
#12 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130 Amp\Promise\{closure}()
#13 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php:45 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->resolve()
#14 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:229 Amp\Deferred->resolve()
#15 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:97 Amp\Loop\NativeDriver->selectStreams()
#16 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:134 Amp\Loop\NativeDriver->dispatch()
#17 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:72 Amp\Loop\Driver->tick()
#18 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:22 Amp\Loop\Driver->run()
#19 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:84 Amp\Loop\TracingDriver->run()
#20 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:64 Amp\Loop::run()
#21 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1400 Amp\PHPUnit\AsyncTestCase->runAsyncTest()
#22 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:36 PHPUnit\Framework\TestCase->runTest()
#23 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1020 Amp\PHPUnit\AsyncTestCase->runTest()
#24 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestResult.php:691 PHPUnit\Framework\TestCase->runBare()
#25 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:752 PHPUnit\Framework\TestResult->run()
#26 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestSuite.php:569 PHPUnit\Framework\TestCase->run()
#27 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/TestRunner.php:616 PHPUnit\Framework\TestSuite->run()
#28 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:200 PHPUnit\TextUI\TestRunner->doRun()
#29 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:159 PHPUnit\TextUI\Command->run()
#30 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/phpunit:61 PHPUnit\TextUI\Command::main()

Cancel Trace:
#0 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:294 Amp\Loop\TracingDriver->cancel()
#1 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:141 Amp\Loop::cancel()
#2 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:140 Amp\ByteStream\ResourceInputStream->read()
#3 /home/kelunik/GitHub/amphp/http-client/src/Connection/Http2Connection.php:469 Amp\Socket\ResourceSocket->read()
#4 Amp\Http\Client\Connection\Http2Connection->run()
#5 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:105 Generator->send()
#6 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130 Amp\Coroutine->Amp\{closure}()
#7 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php:45 class@anonymous/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Deferred.php0x7fe1ece202d7->resolve()
#8 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:101 Amp\Deferred->resolve()
#9 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:198 Amp\ByteStream\ResourceInputStream::Amp\ByteStream\{closure}()
#10 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:97 Amp\Loop\NativeDriver->selectStreams()
#11 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:134 Amp\Loop\NativeDriver->dispatch()
#12 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:72 Amp\Loop\Driver->tick()
#13 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:22 Amp\Loop\Driver->run()
#14 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:84 Amp\Loop\TracingDriver->run()
#15 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:64 Amp\Loop::run()
#16 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1400 Amp\PHPUnit\AsyncTestCase->runAsyncTest()
#17 /home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:36 PHPUnit\Framework\TestCase->runTest()
#18 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:1020 Amp\PHPUnit\AsyncTestCase->runTest()
#19 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestResult.php:691 PHPUnit\Framework\TestCase->runBare()
#20 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestCase.php:752 PHPUnit\Framework\TestResult->run()
#21 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/Framework/TestSuite.php:569 PHPUnit\Framework\TestCase->run()
#22 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/TestRunner.php:616 PHPUnit\Framework\TestSuite->run()
#23 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:200 PHPUnit\TextUI\TestRunner->doRun()
#24 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/src/TextUI/Command.php:159 PHPUnit\TextUI\Command->run()
#25 /home/kelunik/GitHub/amphp/http-client/vendor/phpunit/phpunit/phpunit:61 PHPUnit\TextUI\Command::main()

/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:132
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:311
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/byte-stream/lib/ResourceInputStream.php:225
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/socket/src/ResourceSocket.php:170
/home/kelunik/GitHub/amphp/http-client/src/Connection/Http2Connection.php:333
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:60
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/functions.php:66
/home/kelunik/GitHub/amphp/http-client/src/Connection/Http2Connection.php:435
/home/kelunik/GitHub/amphp/http-client/src/Connection/HttpStream.php:49
/home/kelunik/GitHub/amphp/http-client/src/Internal/InterceptedStream.php:44
/home/kelunik/GitHub/amphp/http-client/src/Interceptor/ModifyRequest.php:31
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:105
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Coroutine.php:110
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Internal/Placeholder.php:130
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Delayed.php:22
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:47
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/NativeDriver.php:121
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:134
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/Driver.php:72
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop/TracingDriver.php:22
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/amp/lib/Loop.php:84
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:64
/home/kelunik/GitHub/amphp/http-client/vendor/amphp/phpunit-util/src/AsyncTestCase.php:36

ERRORS!
Tests: 34, Assertions: 72, Errors: 1.

Issue with suppressed errors and empty error_get_last

$written = @\fwrite($stream, $data, $chunkSize);

I currently debugged an issue in IpcLogger of aerys. It hang with 100% CPU as the socket was not detected as dead. In my setup @fwrite does NOT populate error_get_last for some reason. Furthermore it also was not populated if the registered error handler returned 'true'.

The only really reliable solution (in IpcLogger) was like that:

        $eh = set_error_handler([$this, 'onDeadIpcSock']);
        $bytes = fwrite($this->ipcSock, $this->writeBuffer);
        set_error_handler($eh);

If it affects that code then I am pretty sure it also affects the byte-stream code, right?
Not sure if this is a bug in PHP because in other places, the @ / error_reporting(0) DID populate the error_get_last

Any other idea to catch fwrite errors in a better way?

Modify chunk size on the fly

I would like to change the chunksize during Resouce(Input|Output)Stream object lifecycle, not just when constructing it. Can we have chunksize mutator, or optional argument in read?

Increase test coverage

We're currently at only 36%, that's way too low. This is an issue that should stay open as long as we're not on an acceptable level.

feof(): supplied resource is not a valid stream resource

kelunik@kelunik ❯ ~/GitHub/amphp/artax ❯ 10:24:18 ❯ master
$ php examples/1-get-request.php
HTTP/1.1 200 OK

{
  "user-agent": "Mozilla/5.0 (compatible; Artax)"
}

PHP Warning:  feof(): supplied resource is not a valid stream resource in /home/kelunik/GitHub/amphp/artax/vendor/amphp/byte-stream/lib/ResourceInputStream.php on line 60

I guess we can just suppress that one?

Assertion fail: Trying to read from a previously fclose()'d resource (Windows)

When trying to do a simple HTTP request with HttpClient, I invariable receive a DnsException thrown by Amp\Dns\Rfc1035StubResolver:197, which is part of a MultiReasonException containing two such exceptions, whose previous exception is an AssertionError from byte-stream\lib\ResourceInputStream.php:135 where an assertion containing the following message fails:

Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.

Libraries

amphp/amp                             v2.5.2
amphp/beanstalk                       v0.3.2
amphp/byte-stream                     v1.8.1
amphp/cache                           v1.4.0
amphp/dns                             v1.2.3
amphp/hpack                           v3.1.0
amphp/http                            v1.6.3
amphp/http-client                     v4.5.5
amphp/http-client-cookies             v1.1.0
amphp/parallel                        v1.4.0
amphp/parser                          v1.0.0
amphp/postgres                        v1.3.3
amphp/process                         v1.1.1
amphp/serialization                   v1.0.0
amphp/socket                          v1.1.3
amphp/sql                             v1.0.1
amphp/sql-common                      v1.1.2
amphp/sync                            v1.4.0
amphp/uri                             v0.1.4
amphp/windows-registry                v0.3.3

Broken symlink in docs

Installing this library (as a dependency of amphp/parallel) through composer results in a broken symlink in the docs directory:

$ ls -la vendor/amphp/byte-stream/docs
lrwxrwxrwx. 1 gen gen   13 Dec 27 15:28 asset -> .shared/asset
...
drwxrwxr-x. 2 gen gen   40 Oct 22 21:37 .shared
$ ls -la vendor/amphp/byte-stream/docs/.shared
total 0
drwxrwxr-x. 2 gen gen  40 Oct 22 21:37 .
drwxrwxr-x. 3 gen gen 300 Oct 22 21:37 ..

This broken link causes Gitlab CI to generate a warning while buliding a project:

No URL provided, cache will be not downloaded from shared cache server. Instead a local version of cache will be extracted. 
WARNING: .../vendor/amphp/byte-stream/docs/asset: chmod .../vendor/amphp/byte-stream/docs/asset: no such file or directory (suppressing repeats) 

(see a related Gitlab issue).

LineReader custom delimiter

Hi.
It would be great to have possibility use custom delimiters in LineReader, e.g.

final class LineReader
{
    ...
    public function __construct(InputStream $inputStream, $delimiter = "\n")
    ...
}
$reader = new \Amp\ByteStream\LineReader($stream, "\x0a\x00");

Duplex stream

Hi,

Is it possible to have a duplex stream? A typical use case is sharing a stream and having one side reading from it, while the other is writing to it. Quite similar to Golang's channels.

Problem writing to stream

I see this in my logs:

Amp\ByteStream\StreamException
Failed to write to stream after multiple attempts; fwrite(): send of 7302 bytes failed with errno=32 Broken pipe

This happens like every few seconds, I can try to provide a reproducible script, but that may take some time (currently on a business trip).

But I thought maybe you have an idea what to check.

Immediate reads and writes might result in blocking

A tight loop of reading and writing with data always being available and always writable, might result in a blocking loop. We should discuss measures to mitigate that, e.g. scheduling immediate write with a Loop::defer() in case many of them happen in a tight loop.

Drop Buffer

I hereby propose to drop Buffer. As far as I'm aware there's not a single use of it in your libraries. It will be kept in the history and can be restored if need arises.

It can stay if somebody is willing to write tests for it, but I'm currently not, and I don't want to have it here without any tests.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.