Giter Site home page Giter Site logo

duplexify's Introduction

duplexify

Turn a writeable and readable stream into a single streams2 duplex stream.

Similar to duplexer2 except it supports both streams2 and streams1 as input and it allows you to set the readable and writable part asynchronously using setReadable(stream) and setWritable(stream)

npm install duplexify

build status

Usage

Use duplexify(writable, readable, streamOptions) (or duplexify.obj(writable, readable) to create an object stream)

var duplexify = require('duplexify')

// turn writableStream and readableStream into a single duplex stream
var dup = duplexify(writableStream, readableStream)

dup.write('hello world') // will write to writableStream
dup.on('data', function(data) {
  // will read from readableStream
})

You can also set the readable and writable parts asynchronously

var dup = duplexify()

dup.write('hello world') // write will buffer until the writable
                         // part has been set

// wait a bit ...
dup.setReadable(readableStream)

// maybe wait some more?
dup.setWritable(writableStream)

If you call setReadable or setWritable multiple times it will unregister the previous readable/writable stream. To disable the readable or writable part call setReadable or setWritable with null.

If the readable or writable streams emits an error or close it will destroy both streams and bubble up the event. You can also explicitly destroy the streams by calling dup.destroy(). The destroy method optionally takes an error object as argument, in which case the error is emitted as part of the error event.

dup.on('error', function(err) {
  console.log('readable or writable emitted an error - close will follow')
})

dup.on('close', function() {
  console.log('the duplex stream is destroyed')
})

dup.destroy() // calls destroy on the readable and writable part (if present)

HTTP request example

Turn a node core http request into a duplex stream is as easy as

var duplexify = require('duplexify')
var http = require('http')

var request = function(opts) {
  var req = http.request(opts)
  var dup = duplexify(req)
  req.on('response', function(res) {
    dup.setReadable(res)
  })
  return dup
}

var req = request({
  method: 'GET',
  host: 'www.google.com',
  port: 80
})

req.end()
req.pipe(process.stdout)

License

MIT

Related

duplexify is part of the mississippi stream utility collection which includes more useful stream modules similar to this one.

duplexify's People

Contributors

chalker avatar dignifiedquire avatar edwardbetts avatar faust64 avatar kapetan avatar mafintosh avatar nageshlop avatar orgads avatar patrickheneise avatar phated avatar timoxley avatar vweevers avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

duplexify's Issues

state is undefined

I see this error. Here the trace from minified code but hope it gives you some clues what the problem is:

#videomail > require<[56]</</Writable.prototype.write@https://videomail.io/js/main-36bb504261.min.js:15125:40
require<[26]</</Duplexify.prototype._write@https://videomail.io/js/main-36bb504261.min.js:9893:169
doWrite@https://videomail.io/js/main-36bb504261.min.js:15019:128
writeOrBuffer@https://videomail.io/js/main-36bb504261.min.js:15015:8
require<[56]</</Writable.prototype.write@https://videomail.io/js/main-36bb504261.min.js:15126:7
writeStream@https://videomail.io/js/main-36bb504261.min.js:18666:188
draw@https://videomail.io/js/main-36bb504261.min.js:18802:27

Taken from latest Firefox

Catch the status of a child process as a stream error

I am using duplexify to create a duplex stream out of a child process. This enables me to create complex pipelines, combining processes with other streams. So, I wrote a simple function that wraps duplexify and child_process.spawn as follows:

function run(program, args) {
	const child = spawn(program, args, {
		stdio: ["pipe", "pipe", "inherit"],
	});
	const stream = duplexify(child.stdin, child.stdout);
	child.on("error", (err) => {
		stream.destroy(err);
	});
	return stream;
}

Now I can use the stream returned by run with stream.pipeline. The only issue that I have with this solution is that I would like run to wait for the child processes' exit status, and if it is not zero, forward an error to the duplex stream. I tried to address this by adding a final method as follows:

function run(program, args) {
	const child = spawn(program, args, {
		stdio: ["pipe", "pipe", "inherit"],
	});
	const final = (stream, callback) => {
		child.on("exit", (code, signal) => {
			if (code !== 0)
				callback(new Error("child process failed"));
			else
				callback();
		});
	};
	const stream = duplexify(child.stdin, child.stdout, {final});
	child.on("error", (err) => {
		stream.destroy(err);
	});
	return stream;
}

but the final function never gets called by duplexify. Is there a way to solve this issue with duplexify?

race condition

It seems like you need to call setReadable before setWritable.

In my case i have a transform stream trumpet that I configure and initialize via an async process.

something like:

module.exports = function(opts){

  var duplexStream = duplexify()
  var trumpet = trumpet()

  asyncInit(function(){

    duplexStream.setReadable(trumpet)
    duplexStream.setWritable(trumpet)

  })

  return duplexStream

}

however changing the order means causes it to break ( i think it loses all buffered data ).

duplexStream.setWritable(trumpet)
duplexStream.setReadable(trumpet)

socket.io-stream question

My app receives a stream socket.io-stream which is emitted at browser and writes to a file using fs. Stream content is binary. But instead of writing to file I want to upload to box.com via their sdk which requires a readstream.

I have tried to use the socket stream directly to sdk upload method but it fail. The sdk upload method only works with fs.createReadStream method.

I need to know with the duplexify pack whether I can create a duplex stream which reads from socket io stream and outputs to box upload sdk?

Thanks

Convert to TypeScript

It would be rad if this library was written in TypeScript. I'd be happy to do the work and submit a PR if that would be welcomed :) This would make it easier to include a d.ts file for other TypeScript users, as well as the other TypeScript type advantages.

Would you be open to accepting a PR? I've prototyped it here.
https://github.com/JustinBeckwith/duplexify

Errors emitted by non-streams2 unhandled

Problem: Although duplexify auto-wraps readable streams that do not appear to be streams2 implementations, errors emitted by the streams2 wrap (e.g. bubbled up from the wrapped stream) are unhandled and will mostly likely crash the current process unless you're lucky enough for the current call stack to have a try/catch in it somewhere.

An example from my use case is in the Azure Storage SDK where most of its stream functions return a custom stream instance called a ChunkStream which, to duplexify, doesn't appear to be streams2.

Repro:

> process.version
'v8.11.4'
> through2=require("through2"),duplexify=require("duplexify"),ChunkStream=require("azure-storage/lib/common/common.node").ChunkStream
...snip...


// Streams2
> thru=through2(),dup=duplexify(null,thru).on("error",console.log.bind(null,"dup caught it"))
...snip...
> thru.emit("error", new Error("boo"))
// OK; the only evidence of the error is through the handler I established
dup caught it Error: boo
...snip (call stack)...

// Non-Streams2
> chunk=new ChunkStream(),dup=duplexify(null,chunk).on("error",console.log.bind(null,"dup caught it"))
...snip...
> chunk.emit("error", new Error("boo"))
Error: boo // <<< OH NO! This is an unhandled error and may crash the process
// The error handler on dup also runs because unhandled errors don't crash the REPL
dup caught it { Error: boo
...snip...

Suggested Fix: Apply equivalent "end-of-stream" behavior to this._readable2 as this._readable has.

Unhandled 'error' event

The reason for posting this issue is that It looks like google storage fails and calls this .destroy(...) which throws an unhandled error. I created an issue in the google storage repo.

We were able to reproduce the unhandled error in a test script. We created a simple stream with stream events, pumpify, and duplexify and ran .destroy(error) which threw an unhandled error.

Kinda new to js, node and streams so I don't really understand anything that happens here but tried to debug a bit. When logging inside _destory( ... ) and destroy( ... ) it looks like both of them are called several times, and the internal one is actually called after "finally" has been logged. Does this make sense? Why does this crash?

Script to reproduce error

import * as streamEvents from "stream-events";
const pumpify = require('pumpify');
import {Duplex, PassThrough} from 'stream';
const hashStreamValidation = require('hash-stream-validation');
const duplexify: any = require('duplexify');


class RequestError extends Error {
    code?: string;
    errors?: Error[];
}

function test() {
    let crc32c = true;
    let md5 = false;

    const validateStream = hashStreamValidation({
        crc32c,
        md5,
        });

    const fileWriteStream = duplexify();
    const stream = streamEvents(
        pumpify([
            new PassThrough(),
            validateStream,
            fileWriteStream,
        ])
    ) as Duplex;

    let code;
    let message;

    code = 'FILE_NO_UPLOAD';
    message = 'File upload failed.';
    const error = new RequestError(message);
    error.code = code;
    error.errors = [];
    try {
        fileWriteStream.destroy(error); <-- Crashes
        // fileWriteStream.destroy(); <-- Good
    } catch (err) {
        console.log("fileWriteStream: ", err)
    }
    stream.uncork();
}

try {
    test();
} catch (err) {
    console.log("test: ", err);
} finally {
    console.log("finally");
}

Environment

MacOS: 11.6
Duplexify: 4.1.2
Pumpify: 2.0.1
Hash-stream-validation: 0.2.4
Stream events: 1.0.5
Npm: 6.14.22
Node: 12.22.1

'drain' event not forwarder

I notice that the drain event is not forwarder from the writable end to the duplexified stream.
Is it expected ?

Drop dependency on readable-stream

It feels like the implementation of streams in Node.js 6+ has settled down a great deal. Would you consider a PR to drop the dependency on the user space implementation of readable-stream? Or are there good reasons to keep on using it?

`Buffer()` without a `new` keyword is going to be hard-deprecated soon

Refs: nodejs/node#8169, nodejs/node#7152.

Two options here:

  • Use the new Buffer API — Buffer.alloc()/Buffer.from()/Buffer.allocUnsafe() (requires a shim for v0.10/v0.12 and older v4.x versions prior to v4.5.0).
  • Use new Buffer() for the time being — just add the new keyword everywhere. You should manually check that everything is safe in this case, see the links below for more explanation. That might be hard-deprecated at some later point.

More background:

Quick grep (you should better re-check):

duplexify-3.4.5.tgz/test.js:279:      t.same(chunk, Buffer('hello world'))
duplexify-3.4.5.tgz/test.js:290:    dup.write(Buffer('hello world'))

The grep above only includes the lines that call Buffer() without the new keyword, if you choose to move to the new API — you should probably also replace new Buffer(…) calls.

Incompatible with ReadableStream from [email protected]

Since nodejs/node#7077 was merged and released in [email protected] the internal buffer of a readable stream is no longer an array but a BufferList. Due to this issue tests started breaking with that release with the following error:

TypeError: Cannot read property 'length' of undefined
    at Connection.Duplexify._forward (/home/travis/build/ipfs/js-ipfs/node_modules/interface-connection/node_modules/duplexify/index.js:170:76)
    at Duplex.onreadable (/home/travis/build/ipfs/js-ipfs/node_modules/interface-connection/node_modules/duplexify/index.js:127:10)
    at emitNone (events.js:86:13)
    at Duplex.emit (events.js:185:7)
    at emitReadable_ (_stream_readable.js:433:10)
    at emitReadable (_stream_readable.js:427:7)
    at readableAddChunk (_stream_readable.js:188:13)
    at Duplex.Readable.push (_stream_readable.js:135:10)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.