Giter Site home page Giter Site logo

through2's Introduction

through2

Build & Test

NPM

A tiny wrapper around Node.js streams.Transform (Streams2/3) to avoid explicit subclassing noise

Inspired by Dominic Tarr's through in that it's so much easier to make a stream out of a function than it is to set up the prototype chain properly: through(function (chunk) { ... }).

fs.createReadStream('ex.txt')
  .pipe(through2(function (chunk, enc, callback) {
    for (let i = 0; i < chunk.length; i++)
      if (chunk[i] == 97)
        chunk[i] = 122 // swap 'a' for 'z'

    this.push(chunk)

    callback()
   }))
  .pipe(fs.createWriteStream('out.txt'))
  .on('finish', () => doSomethingSpecial())

Or object streams:

const all = []

fs.createReadStream('data.csv')
  .pipe(csv2())
  .pipe(through2.obj(function (chunk, enc, callback) {
    const data = {
        name    : chunk[0]
      , address : chunk[3]
      , phone   : chunk[10]
    }
    this.push(data)

    callback()
  }))
  .on('data', (data) => {
    all.push(data)
  })
  .on('end', () => {
    doSomethingSpecial(all)
  })

Note that through2.obj(fn) is a convenience wrapper around through2({ objectMode: true }, fn).

Do you need this?

Since Node.js introduced Simplified Stream Construction, many uses of through2 have become redundant. Consider whether you really need to use through2 or just want to use the 'readable-stream' package, or the core 'stream' package (which is derived from 'readable-stream'):

const { Transform } = require('readable-stream')

const transformer = new Transform({
  transform(chunk, enc, callback) {
    // ...
  }
})

API

through2([ options, ] [ transformFunction ] [, flushFunction ])

Consult the stream.Transform documentation for the exact rules of the transformFunction (i.e. this._transform) and the optional flushFunction (i.e. this._flush).

options

The options argument is optional and is passed straight through to stream.Transform. So you can use objectMode:true if you are processing non-binary streams (or just use through2.obj()).

The options argument is first, unlike standard convention, because if I'm passing in an anonymous function then I'd prefer for the options argument to not get lost at the end of the call:

fs.createReadStream('/tmp/important.dat')
  .pipe(through2({ objectMode: true, allowHalfOpen: false },
    (chunk, enc, cb) => {
      cb(null, 'wut?') // note we can use the second argument on the callback
                       // to provide data as an alternative to this.push('wut?')
    }
  ))
  .pipe(fs.createWriteStream('/tmp/wut.txt'))

transformFunction

The transformFunction must have the following signature: function (chunk, encoding, callback) {}. A minimal implementation should call the callback function to indicate that the transformation is done, even if that transformation means discarding the chunk.

To queue a new chunk, call this.push(chunk)—this can be called as many times as required before the callback() if you have multiple pieces to send on.

Alternatively, you may use callback(err, chunk) as shorthand for emitting a single chunk or an error.

If you do not provide a transformFunction then you will get a simple pass-through stream.

flushFunction

The optional flushFunction is provided as the last argument (2nd or 3rd, depending on whether you've supplied options) is called just prior to the stream ending. Can be used to finish up any processing that may be in progress.

fs.createReadStream('/tmp/important.dat')
  .pipe(through2(
    (chunk, enc, cb) => cb(null, chunk), // transform is a noop
    function (cb) { // flush function
      this.push('tacking on an extra buffer to the end');
      cb();
    }
  ))
  .pipe(fs.createWriteStream('/tmp/wut.txt'));

through2.ctor([ options, ] transformFunction[, flushFunction ])

Instead of returning a stream.Transform instance, through2.ctor() returns a constructor for a custom Transform. This is useful when you want to use the same transform logic in multiple instances.

const FToC = through2.ctor({objectMode: true}, function (record, encoding, callback) {
  if (record.temp != null && record.unit == "F") {
    record.temp = ( ( record.temp - 32 ) * 5 ) / 9
    record.unit = "C"
  }
  this.push(record)
  callback()
})

// Create instances of FToC like so:
const converter = new FToC()
// Or:
const converter = FToC()
// Or specify/override options when you instantiate, if you prefer:
const converter = FToC({objectMode: true})

License

through2 is Copyright © Rod Vagg and additional contributors and licensed under the MIT license. All rights not explicitly granted in the MIT license are reserved. See the included LICENSE file for more details.

through2's People

Contributors

andre487 avatar brycebaril avatar caaalabash avatar caleyd avatar cvibhagool avatar delapouite avatar ehmicky avatar felixrabe avatar jeromew avatar mafintosh avatar martinheidegger avatar max-mapper avatar oprogramador avatar rvagg avatar scovetta avatar skyline-123 avatar stevemao avatar styfle avatar takenpilot avatar yocontra avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

through2's Issues

Async issue

Hi all,

I'm replicating together two hyperlogs, and I find that this works:

aliceReplication
pipe(through(function (buf, enc, next) {
   next(null, buf)
 }))
 .pipe(bobReplication)
 .pipe(aliceReplication)

but, this does not

aliceReplication
pipe(through(function (buf, enc, next) {
   setTimeout(() => next(null, buf), 1)
 }))
 .pipe(bobReplication)
 .pipe(aliceReplication)

I find this behavior with all asynchronous functions. Any idea why this might be happening? I am having trouble finding similar issues.

(The stuff in my through stream needs to call back asynchronously - setTimeout serves as a minimal example).

writing data from outside the write callback

in require('through') you could do this:

var through = require('through')
var stream = through()
stream.queue(new Buffer('hello'))
stream.queue(new Buffer('world'))

and stream would emit two buffers.

in through2 if you do this:

var through = require('through2')
var stream = through()
stream.push(new Buffer('hello'))
stream.push(new Buffer('world'))

only the first .push gets emitted, because I believe that stream.Transform from node core will only emit subsequent chunks after the callback from the _transform function is called, but I can't access the callback since i'm trying to write data from outside _transform function

do you know how to get _transform to piss off and let me emit data?

object stream throwing error

function splitHandler(chunk, enc, cb){
   var queryparams = chunk.split(',')[7];
   var queryobj =  querystring.parse(queryparams);
   this.push(queryobj);
   cb();
 }

function readQueueHandler(file_path, cb){
  var stream = fs.createReadStream(file_path)
                 .pipe(zlib.createGunzip())
                 .pipe(es.split())
                 .pipe(through2.obj(splitHandler))
                 .pipe(process.stdout)

  stream.on('end', cb);
}
net.js:614
    throw new TypeError('invalid data');
          ^
TypeError: invalid data
    at WriteStream.Socket.write (net.js:614:11)
    at write (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:605:24)
    at flow (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:614:7)
    at Transform.pipeOnReadable (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:646:5)
    at Transform.emit (events.js:92:17)
    at emitReadable_ (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:430:10)
    at emitReadable (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:426:5)
    at readableAddChunk (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:187:9)
    at Transform.Readable.push (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_readable.js:149:10)
    at Transform.push (/home/app/aquirer_shuffle/node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:145:32)

If I change splitHandler to return a string it works fine. any kind of object throws the same error.

Mistake in README.md?

May be this is my mistake but I suppose that record.unit = "F" should be record.unit == "F". Am I wrong?

Transform request body

Hi guys,
congrats for this awesome lib.

I need to transform the request body before it has been proxied.
This is my code

function write(chunk, encoding, next){
  var transformed_body = new BodyHelper(JSON.parse(chunk), this.body_transformation).apply()
  this.push(JSON.stringify(transformed_body))
  next();
}

var transform = through.obj(write);
transform.body_transformation = // custom rule
req.pipe(transform).pipe(request.post(settings)).pipe(res)

In this example the object transformed_body is correctly formatted, but the following piping not receive the body correctly.
Instead even if I use .obj function I need to stringify the object because it throw the error Argument passed must be a string or buffer, why?

Where I'm making a mistake?

Thanks

Is there a way to end stream in a promise

function install(cb) {
    let pluginsArr = [];
    for (let name in plugins) {
        pluginsArr.push(`${name}@${plugins[name]}`)
    }
    npmLoad().then(function () {
        let installCmd = Promise.promisify(npm.commands.install, {multiArgs: true})
        return installCmd('.', pluginsArr)
    }).then(function (list) {
        cb();
    }, function (err){
        this.emit('error', new gulpUtil.PluginError('dependencies-install', err));
    });

}

Error: Cannot find module 'readable-stream/transform'

Pretty self-explanatory:

$ gulp

Error: Cannot find module 'readable-stream/transform'
    at Function.Module._resolveFilename (module.js:338:15)
    at Function.Module._load (module.js:280:25)
    at Module.require (module.js:364:17)
    at require (module.js:380:17)
    at Object.<anonymous> (/var/www/videomail/staging/source/node_modules/gulp-util/node_modules/through2/through2.js:1:79)
    at Module._compile (module.js:456:26)
    at Object.Module._extensions..js (module.js:474:10)
    at Module.load (module.js:356:32)
    at Function.Module._load (module.js:312:12)
    at Module.require (module.js:364:17)
$ node -v
v0.10.36

And I spotted this during npm install

npm WARN unmet dependency /var/www/videomail/staging/source/node_modules/browserify/node_modules/deps-sort/node_modules/through2 requires readable-stream@'~1.0.17' but will load
npm WARN unmet dependency undefined,
npm WARN unmet dependency which is version undefined

Hmmm, what should I do?

Through2 should emit('end') after flush.

Hey Rod,

I'm having problems getting a pipeline to shutdown so my program can exit. It looks like a my through2 pipe is not passing along the end event of it's readable stream. If I explicitly emit('end') in the flush function, all goes well:

var objectStreamInspector = through.obj(function(chunk, enc, transformCallback) {
    console.dir(chunk);
    this.push(chunk);
    transformCallback();
}, function flush(flushCallback){
    console.log('flushing');
    this.push(null);
    this.emit('end'); //if I don't do this, even though the input stream ends, this stream won't
    flushCallback();
});

It looks to me like transform streams are supposed to emit('end') after their flush. The stream (v0.10.33) docs say:

end is fired after all data has been output which is after the callback in _flush has been called.

I might be misunderstanding this.

--Brad Olson
ABS

Async issues?

Hi,

I have a bug that seems to surface only when I use the callback() function with a setTimeout/setImmediate, etc.

So basically, if I fail to call callback() before the transformer function exits, the error occurs. Is this normal? I've found that sometimes I need to call callback asynchrously if I need to do some IO first before calling it.

The error is `SynaxError: [Unexpected end of input...]" but I can't see where it's being triggered from exactly.

Can't I pipe a stream to an object stream?

var through2 = require('through2');

process.stdin.pipe(through2(function (chunk, _, next) {
  this.push({
    data: chunk.toString()
  });
  return next();
}))
  .pipe(through2.obj(function (chunk, _, next) {
    //do something
  })).pipe(process.stdout);

It throws "TypeError: Invalid non-string/buffer chunk". Must I stringify the output object when pushing and parse it at next pipeline?

How I can create stream?

I try

through2.obj(function(file, encoding, callback) {
    this.emit('data', '...');
    this.emit('end');

   callback()
})

but it is not work with pipes

Unhandled stream error in pipe

I'm doing
dump_stream.pipe(through2({objectMode:true}, my_transformer))

and getting this:

Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: throw er; // Unhandled stream error in pipe.
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: ^
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: Error: invalid event name: None
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at DestroyableTransform.my_transformer as _transform
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at DestroyableTransform.Transform._read (/app/node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:184:10)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at DestroyableTransform.Transform._write (/app/node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:172:12)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at doWrite (/app/node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:237:10)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at writeOrBuffer (/app/node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:227:5)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at DestroyableTransform.Writable.write (/app/node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:194:11)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at Stream.ondata (stream.js:51:26)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at Stream.emit (events.js:95:17)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at Stream.reemit (/app/node_modules/mixpanel/node_modules/event-stream/node_modules/duplexer/index.js:70:25)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at Stream.emit (events.js:95:17)
Feb 27 16:11:20 analytics-migration app/analytics_migration_2.1: at Stream. (/app/node_modules/mixpanel/node_modules/event-stream/index.js:237:12)

My code doesn't create,subscribe,etc to any events, so I suspect this is a bug in through2?

My transformer does call its callback with an error sometimes:
callback(new Error(chunk.error))
But that is correct, right?

Respecting encoding?

Shouldn't enc in the below example be 'utf8'? In node 0.10.26 it's buffer:

var fs = require('fs');
var through2 = require('through2');
fs.createReadStream('./file.json', { encoding: 'utf8' })
  .pipe(through2(function(chunk, enc, cb) {
    console.log('chunk', chunk, 'enc', enc);
    this.push(chunk);
    cb();
  }))

Is it expected that transforms are doing string conversions? Am I doing something wrong?

npm ERR! shasum check failed for /tmp/npm-8839-e9fb0d5e/registry.npmjs.org/through2/-/through2-0.6.5.tgz

Hi, I keep getting this error when I try to install dependencies.

ERR! node v4.0.0
npm ERR! npm v2.14.3

npm ERR! shasum check failed for /tmp/npm-8839-e9fb0d5e/registry.npmjs.org/through2/-/through2-0.6.5.tgz
npm ERR! Expected: 41ab9c67b29d57209071410e1d7a7a968cd3ad48
npm ERR! Actual: da39a3ee5e6b4b0d3255bfef95601890afd80709
npm ERR! From: https://registry.npmjs.org/through2/-/through2-0.6.5.tgz
npm ERR!
npm ERR! If you need help, you may report this error at:
npm ERR! https://github.com/npm/npm/issues

Missing CHANGELOG?

I am upgrading the version to the latest and wondering if there are any CHANGELOG file or any similar document listing the breaking changes/features.

please use a well known open source license

MIT +no-false-attribs was cooked by the initial author of npm.
Since then he has switched almost all his work to ISC license.

Adding a custom clause to a license actually makes it a brand new
license, which interpretation is unforseeable unless you're a specialized lawyer.

In this special case of through2, it does seem be useful anyway.

Upgrade to streams3

Can we bump the readable-stream version that this modules uses? I want to use methods like cork and uncork.

No transformFunction, but not passing the same data

I am using through2 to inspect a stream that i proxy.

But they input is not the same as the output apparently, even if i do a pass through stream like this:

.pipe(through2())

I also tried .pipe(through2({decodeStrings: false})), but that did not help.

How can that be? I am piping a request to a node http res. I am really lost on this!

and according to the docs if you do not pass anything it

If you do not provide a transformFunction then you will get a simple pass-through stream.

nowritecb error thrown

Any reason why I would be getting the following error:

uncaughtException no writecb in Transform class
Error: no writecb in Transform class
    at afterTransform (/Users/scott/www/particle/node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:92:33)
    at TransformState.afterTransform (/Users/scott/www/particle/node_modules/through2/node_modules/readable-stream/lib/_stream_transform.js:76:1

Code worked using map-stream module. Converterted to try this module and getting this error now.

Thanks.

How can I resume()/pause() a through2 stream? <-- Error: Cannot switch to old mode now.

I have a through2 stream whose function runs

...
self.pause();
somethingAsyncOnChunk(chunk);
done();
...

and the stream is resumed when an event happens (outside the callback passed to through2.obj constructor).

But this code throws Error: Cannot switch to old mode now..

I am using pause()/resume() because I cannot pass any callback to the function somethingAsyncOnChunk(chunk)

Apparently it is not possible to use them because the stream I have is in a pipeline (pipe()).
But how may I pause()/resume() the stream?

Issue getting the basics to work

Hello again, @rvagg - thanks for the amazing set of tools.

I am currently using map-stream by @dominictarr and am trying to convert to through2 for the same functionality.

I have stripped away all of the business logic and am left with a simple pass through:

pipeline.pipe(map(function(file, cb) {
  console.log('I work!');
  cb(null, file);
}));

I really would like to use through2, and if I am understanding the docs correctly, this would look like:

pipeline.pipe(through2(function(chunk, enc, callback) {
  this.push(chunk);
  callback();
}));

However, when using the above code, I am consistently getting the following error:

TypeError: Invalid non-string/buffer chunk

I have additionally tried through2-map directly since my final solution only needs the mapping capabilities, but still no luck.

There must be something simple I am overlooking.. can you assist? I appreciate your time.

stream.push() after EOF?

hi Ive created a [gulp pulgin](https://github.com/VaJoy/gulp-embed) but it runs with some problems heres the codes of my plugin:

'use strict';

var path = require('path');
var gutil = require('gulp-util');
var through2 = require('through2');
var PluginError = gutil.PluginError;
var ResourceEmbedder = require('./lib/resource-embedder2');

module.exports = function (options) {
    return through2.obj(function (file, enc, cb) {

        var filepath = file.path;

        //file not exsists
        if (file.isNull()) {
            this.emit('error', new PluginError('gulp-embed', 'File not found: "' + filepath + '"'));
            return cb();
        }

        var embedder = new ResourceEmbedder(filepath);

        embedder.get(function (markup) {
            var f = new gutil.File({
                cwd: '',
                path: path.basename(filepath),
                contents: new Buffer(markup)
            });

            this.push(f);
            cb();
        }.bind(this));

    })
};

its weird that sometime it runs well but sometime itll run with errors below:

events.js:85
      throw er; // Unhandled 'error' event
            ^
Error: stream.push() after EOF
  at readableAddChunk (D:\github\test\node_modules\gulp-embed\demo\node_modules\gulp-embed\node_modules\through2\node_modules\readable-stream\lib\_stream_readable.js:184:15)
  at DestroyableTransform.Readable.push (D:\github\test\node_modules\gulp-embed\demo\node_modules\gulp-embed\node_modules\through2\node_modules\readable-stream\lib\_stream_readable.js:162:10)
  at DestroyableTransform.Transform.push (D:\github\test\node_modules\gulp-embed\demo\node_modules\gulp-embed\node_modules\through2\node_modules\readable-stream\lib\_stream_transform.js:133:32)
  at DestroyableTransform.<anonymous> (D:\github\test\node_modules\gulp-embed\demo\node_modules\gulp-embed\index.js:31:18)
  at doEmbedding (D:\github\test\node_modules\gulp-embed\demo\node_modules\gulp-embed\lib\resource-embedder2\src\resource-embedder.coffee:82:9)
  at process._tickCallback (node.js:355:11)

this problem seems to be that the stream is ending before I push the file? but I have no idea`bout how it happened and how to solve it?

how to transform the data in my stream?

First of all, thank you for your library!

I need to transform the data of my stream.

Here goes my code:

var body = '';

var stream = through2(function(chunk, enc, callback) {
var result = body;
this.push(result);
callback();
});

req.pipe(stream);

Is it correct? May I use this new stream?

Thank you for your help

Multiple outputs from transform function are not supported

Hi,

Sorry in advance if I misunderstand the concept of working with the library, but I had to modify it a bit to suit my needs — probably, this can be considered as an issue.

The case was the following: the transform function resulted in several output pieces to be written. Say, I have an incoming string like "Key=1, Value=1; Key=2, Value=2". I need to split them into

{ Key : 1, Value : 1 }, { Key : 2, Value : 2 }.

The transform function reads from the stream and, once it encounters ";", it creates a new object and calls back. The problem is that by the time the callback should be called in the second time, it's already null (function afterTransform in _stream_transform does this). I had to comment lines

ts.writecb = null;

in afterTransform and

strate.writecb = null;

in _stream_writable / doWrite.

question: do these streams close themselves after idle periods?

i'm probably misusing this library, but i'm noticing that my streams are ending after a certain amount of time. would that happen after a certain amount of idle time? or perhaps when the stream has been drained?

Working with a "stream to slack" logger:

function Slack (url, options) {
  let send = Send(url)(options)

  return through(function (chunk, enc, fn) {
    fn(null, chunk)
    let lines = chunk.length ? chunk.toString().split('\n') : ['']
    for (let i = 0, line; line = lines[i]; i++) {
      if ((lines[i].length === 0) && (i == lines.length - 1)) break
      let log = JSON.parse(line)
      send.apply(null, format(log))
    }
  })
}

It could easily be something else too, but I'm noticing that all my loggers using through2 stop working after some time.

TypeError: Cannot read property 'ended' of undefined

I am using this module with split2 and I consistently get this error.

 if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
           ^

TypeError: Cannot read property 'ended' of undefined
    at DestroyableTransform.Object.<anonymous>.Writable.write (/phantomjs-node/node_modules/through2/node_modules/readable-stream/lib/_stream_writable.js:239:12)
    at Socket.ondata (_stream_readable.js:555:20)
    at emitOne (events.js:96:13)
    at Socket.emit (events.js:188:7)
    at readableAddChunk (_stream_readable.js:176:18)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at Pipe.onread (net.js:551:20)
npm ERR! Test failed.  See above for more details.

I haven't found any other issues similar to this. Does anybody have any idea what is going on?

strange issue with highWaterMark

Most likely I am doing something wrong, but the following code does not do what I would expect (write out all of the objects). What happens instead is that writing to the file stops after 16 items are written. It seems that the through2 object stream hits the default highWaterMark limit of 16 and stops handling new writes.

var through2 = require('through2');
var fs = require('fs');

var file_stream = fs.createWriteStream(__dirname + '/test.txt');

var obj_stream = through2.obj(function(chunk, enc, cb) {
    file_stream.write(Buffer(JSON.stringify(chunk) + '\n'));
    cb(null, chunk);
});

var i = 0;
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });

// these are never written
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });
obj_stream.write({ foo: ++i });

Is there a way to know if the stream is "blocked" or "waiting for flush" and similarly, what would be causing this if I am calling the transform callback?

Add data to end callback

Is there a reason you don't allow writing data to the flush callback, like with the transform callback?

performance issues?

I just built https://github.com/loveencounterflow/basic-stream-benchmarks to get a handle on how heavy stream transforms are. The code is quite simple:

input         = FS.createReadStream   input_path
output        = FS.createWriteStream  output_path
#.........................................................................................................
p = input
p = p.pipe $split()
#.........................................................................................................
p = p.pipe through2.obj ( data, encoding, callback ) ->
  unless S.t0?
    start_profile S
    S.t0 ?= Date.now()
  S.byte_count += data.length
  S.item_count += +1
  @push data
  callback()
#.........................................................................................................
for _ in [ 1 .. n ] by +1
  if mode is 'sync'
    p = p.pipe through2.obj ( data, encoding, callback ) -> @push data; callback()
  else
    p = p.pipe through2.obj ( data, encoding, callback ) -> setImmediate => @push data; callback()
#.........................................................................................................
p = p.pipe through2 ( data, encoding, callback ) -> @push data + '\n'; callback()
p = p.pipe output

Now I may be totally wrong here and maybe there's some blunder I committed (in a way I hope so), but if
the numbers are correct then adding single stream transform with through2 may cost you like almost a second (depending on hardware, amount of data processed and so on).

Now I'm aware that you normally won't build pipelines with hundreds of stream transforms as I do for the test, but shouldn't a no-op transform weigh in more like a no-op function call, i.e. orders of magnitude below a whole second?

The flamegraphs I produced (and that you can produce after cloning and e.g. running in Chrome DevTools)
would indicate that indeed most of the time is spent in through2 and readable-stream code (have a look at them over at https://github.com/loveencounterflow/basic-stream-benchmarks#flamegraphs).

Now I'm aware that the tests I ran process a file with ~50k lines, so each transform is called that often. Still it means that just in order to work through a run-of-the-mill 1.4MB text file I have to be prepared that adding just 10 steps to the processing will make the time required go from 4 seconds to 10 seconds just for the overhead of it. Compare that to how fast I can, say, grep the same file with a regex (time says 0m0.006s).

Any ideas how to improve the performance story?

through2 does not support readableObjectMode and writableObjectMode

through2 has a dependency on:
"name": "readable-stream",
"version": "2.0.2",
"description": "Streams3, a user-land copy of the stream library from iojs v2.x"

This does not support the split-mode options readableObjectMode and writableObjectMode that are now available on native streams in node 0.12.7. This makes it impossible to create a transformer that reads objects and writes buffers.

Converter to object mode?

Greetings. Perhaps this is a stupid n00b question, but...

I've got a library, gulp, that is using through2 in normal (binary?) stream mode, and I'd like to pass my stream through another library, exorcist, which is using through2 in objectMode.

Is there any way to reconcile these streams? Something I could pipe it through?

Not signalling `end` event?

I would expect the following test to work, but it's not.

var through2 = require('through2');
var vfs = require('vinyl-fs');

it('passes a file along with null contents', function(done) {
  vfs.src('test/fixtures/foo.js')
    .pipe(through2.obj())
    .on('end', () => {
      done();
    });
});

Is this an issue with through2 or is there no issue at all? What should I expect? I'm thinking the end event should be fired.

through2 event for flush function

Hello.
I've got some difficulties with flush callback. How to know that he's done everything? Is there exist event for that?
Here's an example:

var files = [];
var stream = through2.obj((file, enc, cb) => {
    files.push(file)
    cb();
}, (cb) => {
    setTimeout(() => {
      console.log('we still inside flush');
      cb();
    }, 100);
    console.log('flush');
});

stream.on('finish', function () {
    console.log('finish event');
})

This code will output:

flush
finish event
we still inside flush

But I need:

flush
we still inside flush
finish event

How can I achieve that with existing API?

Writable stream API partially missing

The following methods (from the stream.Writable interface) are not exposed on a through2 stream:

Given that through2 is just a wrapper around a Transform; and a Transform is just a Duplex stream. And a Duplex stream is a:

... stream that implement[s] both the Readable and Writable interfaces.

What am I missing?

Arrow function leaves `this` undefined in code block.

Any clarification on why this is happening?

Misc specs

Node version: 5.6.0
strict-mode: enabled via 'use strict'
through2 version: 2.0.1

The Good 👍

Works fine with keyword Function declaration:

.pipe(Through2(function(chunk, enc, callback) {
        this.push(chunk)

        callback();
}))

The Bad 👎

*Breaks when using ES2015 Arrow Function: *

.pipe(Through2((chunk, enc, callback) => {
        this.push(chunk)

        callback();
}))

The Ugly 🐛

** The error displayed in my cli: **

this.push(results)
            ^

TypeError: Cannot read property 'push' of undefined
  at: ...

Searched through Issues and I did not find anything related. Posting here as reference for anyone that may experience the issue in the future.

Aloha.

What is the purpose of DestroyableTransform?

I have implemented a through2-like transform stream function wrapper that uses a simpler interface than through2 here.

While in through2 you do

fs.createReadStream('ex.txt')
  .pipe(thru(function (chunk, enc, callback) {
    for (var i = 0; i < chunk.length; i++)
      if (chunk[i] == 97)
        chunk[i] = 122 // swap 'a' for 'z'

    this.push(chunk)

    callback()
   }))
  .pipe(fs.createWriteStream('out.txt'))

In mine you could:

fs.createReadStream("ex.txt")
  .pipe(thru(function (chunk) {
    return chunk.map(function (value) {
      return (value == 97) ? 122 : value
    })
   }))
  .pipe(fs.createWriteStream("out.txt"))

I find this syntax (not necessarily adding the custom .map() function to the chunk object), but the return style to pass data) simpler, but there is no notion of destroying anything in my implementation.

  1. Why do I need this?
  2. What do you think of yet another through version that accomodates for this syntax?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.