mafintosh / duplexify Goto Github PK
View Code? Open in Web Editor NEWTurn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input
License: MIT License
Turn a writable and readable stream into a streams2 duplex stream with support for async initialization and streams1/streams2 input
License: MIT License
My app receives a stream socket.io-stream which is emitted at browser and writes to a file using fs. Stream content is binary. But instead of writing to file I want to upload to box.com via their sdk which requires a readstream.
I have tried to use the socket stream directly to sdk upload method but it fail. The sdk upload method only works with fs.createReadStream method.
I need to know with the duplexify pack whether I can create a duplex stream which reads from socket io stream and outputs to box upload sdk?
Thanks
Should respect readableObjectMode
and writableObjectMode
options:
https://nodejs.org/dist/latest-v8.x/docs/api/stream.html#stream_object_mode_duplex_streams
The reason for posting this issue is that It looks like google storage fails and calls this .destroy(...)
which throws an unhandled error. I created an issue in the google storage repo.
We were able to reproduce the unhandled error in a test script. We created a simple stream with stream events
, pumpify
, and duplexify
and ran .destroy(error)
which threw an unhandled error.
Kinda new to js, node and streams so I don't really understand anything that happens here but tried to debug a bit. When logging inside _destory( ... )
and destroy( ... )
it looks like both of them are called several times, and the internal one is actually called after "finally" has been logged. Does this make sense? Why does this crash?
import * as streamEvents from "stream-events";
const pumpify = require('pumpify');
import {Duplex, PassThrough} from 'stream';
const hashStreamValidation = require('hash-stream-validation');
const duplexify: any = require('duplexify');
class RequestError extends Error {
code?: string;
errors?: Error[];
}
function test() {
let crc32c = true;
let md5 = false;
const validateStream = hashStreamValidation({
crc32c,
md5,
});
const fileWriteStream = duplexify();
const stream = streamEvents(
pumpify([
new PassThrough(),
validateStream,
fileWriteStream,
])
) as Duplex;
let code;
let message;
code = 'FILE_NO_UPLOAD';
message = 'File upload failed.';
const error = new RequestError(message);
error.code = code;
error.errors = [];
try {
fileWriteStream.destroy(error); <-- Crashes
// fileWriteStream.destroy(); <-- Good
} catch (err) {
console.log("fileWriteStream: ", err)
}
stream.uncork();
}
try {
test();
} catch (err) {
console.log("test: ", err);
} finally {
console.log("finally");
}
MacOS: 11.6
Duplexify: 4.1.2
Pumpify: 2.0.1
Hash-stream-validation: 0.2.4
Stream events: 1.0.5
Npm: 6.14.22
Node: 12.22.1
I am using duplexify
to create a duplex stream out of a child process. This enables me to create complex pipelines, combining processes with other streams. So, I wrote a simple function that wraps duplexify
and child_process.spawn
as follows:
function run(program, args) {
const child = spawn(program, args, {
stdio: ["pipe", "pipe", "inherit"],
});
const stream = duplexify(child.stdin, child.stdout);
child.on("error", (err) => {
stream.destroy(err);
});
return stream;
}
Now I can use the stream returned by run
with stream.pipeline
. The only issue that I have with this solution is that I would like run
to wait for the child processes' exit status, and if it is not zero, forward an error to the duplex stream. I tried to address this by adding a final
method as follows:
function run(program, args) {
const child = spawn(program, args, {
stdio: ["pipe", "pipe", "inherit"],
});
const final = (stream, callback) => {
child.on("exit", (code, signal) => {
if (code !== 0)
callback(new Error("child process failed"));
else
callback();
});
};
const stream = duplexify(child.stdin, child.stdout, {final});
child.on("error", (err) => {
stream.destroy(err);
});
return stream;
}
but the final
function never gets called by duplexify
. Is there a way to solve this issue with duplexify
?
I see this error. Here the trace from minified code but hope it gives you some clues what the problem is:
#videomail > require<[56]</</Writable.prototype.write@https://videomail.io/js/main-36bb504261.min.js:15125:40
require<[26]</</Duplexify.prototype._write@https://videomail.io/js/main-36bb504261.min.js:9893:169
doWrite@https://videomail.io/js/main-36bb504261.min.js:15019:128
writeOrBuffer@https://videomail.io/js/main-36bb504261.min.js:15015:8
require<[56]</</Writable.prototype.write@https://videomail.io/js/main-36bb504261.min.js:15126:7
writeStream@https://videomail.io/js/main-36bb504261.min.js:18666:188
draw@https://videomail.io/js/main-36bb504261.min.js:18802:27
Taken from latest Firefox
It feels like the implementation of streams in Node.js 6+ has settled down a great deal. Would you consider a PR to drop the dependency on the user space implementation of readable-stream
? Or are there good reasons to keep on using it?
It seems like you need to call setReadable
before setWritable
.
In my case i have a transform stream trumpet that I configure and initialize via an async process.
something like:
module.exports = function(opts){
var duplexStream = duplexify()
var trumpet = trumpet()
asyncInit(function(){
duplexStream.setReadable(trumpet)
duplexStream.setWritable(trumpet)
})
return duplexStream
}
however changing the order means causes it to break ( i think it loses all buffered data ).
duplexStream.setWritable(trumpet)
duplexStream.setReadable(trumpet)
Since nodejs/node#7077 was merged and released in [email protected] the internal buffer
of a readable stream is no longer an array but a BufferList
. Due to this issue tests started breaking with that release with the following error:
TypeError: Cannot read property 'length' of undefined
at Connection.Duplexify._forward (/home/travis/build/ipfs/js-ipfs/node_modules/interface-connection/node_modules/duplexify/index.js:170:76)
at Duplex.onreadable (/home/travis/build/ipfs/js-ipfs/node_modules/interface-connection/node_modules/duplexify/index.js:127:10)
at emitNone (events.js:86:13)
at Duplex.emit (events.js:185:7)
at emitReadable_ (_stream_readable.js:433:10)
at emitReadable (_stream_readable.js:427:7)
at readableAddChunk (_stream_readable.js:188:13)
at Duplex.Readable.push (_stream_readable.js:135:10)
Problem: Although duplexify
auto-wraps readable streams that do not appear to be streams2 implementations, errors emitted by the streams2 wrap (e.g. bubbled up from the wrapped stream) are unhandled and will mostly likely crash the current process unless you're lucky enough for the current call stack to have a try/catch
in it somewhere.
An example from my use case is in the Azure Storage SDK where most of its stream functions return a custom stream instance called a ChunkStream which, to duplexify
, doesn't appear to be streams2.
Repro:
> process.version
'v8.11.4'
> through2=require("through2"),duplexify=require("duplexify"),ChunkStream=require("azure-storage/lib/common/common.node").ChunkStream
...snip...
// Streams2
> thru=through2(),dup=duplexify(null,thru).on("error",console.log.bind(null,"dup caught it"))
...snip...
> thru.emit("error", new Error("boo"))
// OK; the only evidence of the error is through the handler I established
dup caught it Error: boo
...snip (call stack)...
// Non-Streams2
> chunk=new ChunkStream(),dup=duplexify(null,chunk).on("error",console.log.bind(null,"dup caught it"))
...snip...
> chunk.emit("error", new Error("boo"))
Error: boo // <<< OH NO! This is an unhandled error and may crash the process
// The error handler on dup also runs because unhandled errors don't crash the REPL
dup caught it { Error: boo
...snip...
Suggested Fix: Apply equivalent "end-of-stream
" behavior to this._readable2
as this._readable
has.
Is it possible to cut a new version with 6930368?
Refs: nodejs/citgm#605
Thank you.
I notice that the drain
event is not forwarder from the writable end to the duplexified stream.
Is it expected ?
Refs: nodejs/node#8169, nodejs/node#7152.
Two options here:
Buffer.alloc()
/Buffer.from()
/Buffer.allocUnsafe()
(requires a shim for v0.10/v0.12 and older v4.x versions prior to v4.5.0).new Buffer()
for the time being — just add the new
keyword everywhere. You should manually check that everything is safe in this case, see the links below for more explanation. That might be hard-deprecated at some later point.More background:
Quick grep (you should better re-check):
duplexify-3.4.5.tgz/test.js:279: t.same(chunk, Buffer('hello world'))
duplexify-3.4.5.tgz/test.js:290: dup.write(Buffer('hello world'))
The grep above only includes the lines that call Buffer()
without the new
keyword, if you choose to move to the new API — you should probably also replace new Buffer(…)
calls.
It would be rad if this library was written in TypeScript. I'd be happy to do the work and submit a PR if that would be welcomed :) This would make it easier to include a d.ts
file for other TypeScript users, as well as the other TypeScript type advantages.
Would you be open to accepting a PR? I've prototyped it here.
https://github.com/JustinBeckwith/duplexify
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.