Giter Site home page Giter Site logo

Comments (17)

peteruithoven avatar peteruithoven commented on July 4, 2024

The same issue can be reproduces without socket.io-stream by making a writable stream that doesn't write.

var debug         = require('debug')('streams');
var fs            = require('fs');
var filename      = "image.jpg";
var readStream    = fs.createReadStream("assets/"+filename);

// one working stream that should write to file
readStream.pipe(fs.createWriteStream(0+"-"+filename));

// one fake stream that doesn't do anthing
var nonWritingStream = new require('stream').Writable();
nonWritingStream._write = function(chunk, encoding, callback) {
  debug("write");
//  callback(); // calling this callback also enables the other write stream
};
readStream.pipe(nonWritingStream);

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Is there a way to detect that a client started handling the stream? I think that could solve my problem.
I tried adding the pipe after I received the internal $stream-read event (from the original socket.io socket) (only the first time). But this breaks streams to all but the first client. It looks like they miss the first (or a few more) chunks.

from socket.io-stream.

nkzawa avatar nkzawa commented on July 4, 2024

Basically, you can't reuse piped stream:

http://stackoverflow.com/questions/19553837/node-js-piping-the-same-stream-into-multiple-writable-targets

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

But it looks like you can have parallel consuming streams, just as long as you pipe the stream to them at the same time.
I'll try to wait a cycle (nextTick) after the first read event and then start piping to all the socket's that I received a read event form.
Another thing I'll try is to collect the 'customers' after they started reading, but then they might miss the first emit.

from socket.io-stream.

nkzawa avatar nkzawa commented on July 4, 2024

I'm not sure, but how about to copy data to other streams manually without pipe method?

incoming.on('data', function(chunk) {
   outgoing1.write(chunk);
   outgoing2.write(chunk);
});
incoming.resume();

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Well the main problem is that the clients don't start reading in parallel, there will always be a unpredictable delay in a client receiving a event and reading to the stream (if they do). The following construction seems to work, but is quite ugly.

var socketClient  = require('socket.io-client');
var debug         = require('debug')('streams');
var ss            = require("socket.io-stream");
var fs            = require('fs');
var PORT          = 5000;
var URL           = "http://localhost:"+PORT;

server();
consumer("a",true,true);
consumer("b",true,false);
consumer("c",false,false);

// SERVER
function server() {
  var http          = require('http').Server();
  var io            = require('socket.io')(http);
  http.listen(PORT, function(){
    debug('server listening on *:' + PORT);
  });
  var nsp = io.of('/');
  var consumers = [];
  nsp.on('connection', function(socket) {
    var query = socket.handshake.query;
    debug("server: new connection: ",query.name);
    var streamingSocket = ss(socket);
    consumers.push(streamingSocket);
    socket.once('$stream-read',function() {
      debug('  $stream-read ',arguments);
      streamingSocket.reading = true;
    });
  });
  // after a arbitrary delay the server sends an image arround
  setTimeout(function() {
    debug("broadcast image");
    var filename = "image.jpg";
    var readStream = fs.createReadStream("assets/"+filename);

    consumers.forEach(function(consumer,index) {
      debug("  consumer: ",index);
      var outgoingStream = ss.createStream();
      consumer.emit('file',outgoingStream,{name:filename});
      debug("  consumer.reading: ",consumer.reading);
      // a delay so that all consumers have a change to start reading
      setTimeout(function() {
        debug("  >consumer.reading: ",consumer.reading);
        if(consumer.reading) {
          readStream.pipe(outgoingStream);
        }
      },500);
    });
  },500); 
}

// CLIENTS
function consumer(name,listen,write) {
  var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
  var serverStreamSocket = ss(serverSocket);
  serverSocket.once('connect', function(){
    debug("consumer "+name+" connected");
    if(listen) {
      serverStreamSocket.on("file",function(stream,data) {
        debug("consumer on file");
        if(write) {
          stream.pipe(fs.createWriteStream(name+"-"+data.name));
        }
      });
    }
  });
}

A compromise might be that all but the first connecting client will receive a partial first event.
Or maybe there is a way to send a "probe" stream, to check who's reading.

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

So the following also works, but then the first emit always results in a 0 byte file.

var socketClient  = require('socket.io-client');
var debug         = require('debug')('streams');
var ss            = require("socket.io-stream");
var fs            = require('fs');
var PORT          = 5000;
var URL           = "http://localhost:"+PORT;

server();
consumer("a",true,true);
consumer("b",true,true);
consumer("c",false,false);

// SERVER
function server() {
  var http          = require('http').Server();
  var io            = require('socket.io')(http);
  http.listen(PORT, function(){
    debug('server listening on *:' + PORT);
  });
  var nsp = io.of('/');
  var consumers = [];
  nsp.on('connection', function(socket) {
    var query = socket.handshake.query;
    debug("server: new connection: ",query.name);
    var streamingSocket = ss(socket);
    consumers.push(streamingSocket);
    socket.once('$stream-read',function() {
      debug('  $stream-read ',arguments);
      streamingSocket.reading = true;
    });
  });
  // after a arbitrary delay the server sends an image arround
  setInterval(function() {
    debug("broadcast image");
    var filename = "image.jpg";
    var readStream = fs.createReadStream("assets/"+filename);

    consumers.forEach(function(consumer,index) {
      debug("  consumer: ",index);
      var outgoingStream = ss.createStream();
      consumer.emit('file',outgoingStream,{name:filename});
      debug("  consumer.reading: ",consumer.reading);
      if(consumer.reading) {
        readStream.pipe(outgoingStream);
      }
    });
  },2000); 
}

// CLIENTS
function consumer(name,listen,write) {
  var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
  var serverStreamSocket = ss(serverSocket);
  var counter = 0;
  serverSocket.once('connect', function(){
    debug("consumer "+name+" connected");
    if(listen) {
      serverStreamSocket.on("file",function(stream,data) {
        debug("consumer on file");
        if(write) {
          stream.pipe(fs.createWriteStream(name+"-"+counter+"-"+data.name));
          counter++;
        }
      });
    }
  });
}

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Another slightly better solution; unpiping the ones that don't seem to listen.

var socketClient  = require('socket.io-client');
var debug         = require('debug')('streams');
var ss            = require("socket.io-stream");
var fs            = require('fs');
var PORT          = 5000;
var URL           = "http://localhost:"+PORT;
var STREAM_EVENT  = ss.Socket.event;

server();
consumer("a",true,true);
consumer("b",false,false);
setTimeout(function() {
  consumer("c",true,true);
  setTimeout(function() {
    consumer("d",true,false);
  },3000);
},3000);


// SERVER
function server() { 
  var http          = require('http').Server();
  var io            = require('socket.io')(http);
  http.listen(PORT, function(){
    debug('server listening on *:' + PORT);
  });
  var nsp = io.of('/');
  var consumers = [];
  nsp.on('connection', function(socket) {
    var query = socket.handshake.query;
    debug("server: new connection: ",query.name);
    //var streamingSocket = ss(socket);
    consumers.push(socket);
  });
  // at a arbitrary interval the server sends an image arround
  setInterval(function() {
    debug("broadcast image");
    var filename = "image.jpg";
    var readStream = fs.createReadStream("assets/"+filename);
    consumers.forEach(function(consumer,index) {
      debug("  pipe to consumer: ",index);
      var outgoingStream = ss.createStream();
      ss(consumer).emit('file',outgoingStream,{name:filename});
      readStream.pipe(outgoingStream);

      var reading = false;
      consumer.once(STREAM_EVENT+'-read',function() {
        debug(index+': $stream-read ');
        reading = true;
      });
      setTimeout(function() {
        debug(index+": reading: ",reading);
        if(!reading) {
          readStream.unpipe(outgoingStream);
          debug("  unpipe to consumer: ",index);
        }
      },100);
    });
  },2000); 
}

// CLIENTS
function consumer(name,listen,write) {
  var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
  var serverStreamSocket = ss(serverSocket);
  var counter = 0;
  serverSocket.once('connect', function(){
    debug("consumer: "+name+" connected");
    if(listen) {
      serverStreamSocket.on("file",function(stream,data) {
        debug("consumer: on file");
        if(write) {
          stream.pipe(fs.createWriteStream(name+"-"+counter+"-"+data.name));
          counter++;
        }
      });
    }
  });
}

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Turning the readable stream into flowing-mode by calling resume() or by adding a data event listener also seems to fix the problem. This seems like the best solution.

Readable streams have two "modes": a flowing mode and a non-flowing mode. When in flowing mode, data is read from the underlying system and provided to your program as fast as possible. In non-flowing mode, you must explicitly call stream.read() to get chunks of data out.

Example:

var socketClient  = require('socket.io-client');
var debug         = require('debug')('streams');
var ss            = require("socket.io-stream");
var fs            = require('fs');
var PORT          = 5000;
var URL           = "http://localhost:"+PORT;

server();
consumer("a",true,true);
consumer("b",false,false);
consumer("c",true,true);
setTimeout(function() {
  consumer("d",true,true);
  setTimeout(function() {
    consumer("e",true,false);
  },3000);
},3000);


// SERVER
function server() { 
  var http          = require('http').Server();
  var io            = require('socket.io')(http);
  http.listen(PORT, function(){
    debug('server listening on *:' + PORT);
  });
  var nsp = io.of('/');
  var consumers = [];
  nsp.on('connection', function(socket) {
    var query = socket.handshake.query;
    debug("server: new connection: ",query.name);
    //var streamingSocket = ss(socket);
    consumers.push(socket);
  });
  // at an arbitrary interval the server sends an image arround
  setInterval(function() {
    debug("broadcast image");
    var filename = "image.jpg";
    var readStream = fs.createReadStream("assets/"+filename);
    readStream.resume(); // switch the stream into flowing-mode
    consumers.forEach(function(consumer,index) {
      debug("  pipe to consumer: ",index);
      var outgoingStream = ss.createStream();
      ss(consumer).emit('file',outgoingStream,{name:filename});
      readStream.pipe(outgoingStream);
    });
  },2000); 
}

// CLIENTS
function consumer(name,listen,write) {
  var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
  var serverStreamSocket = ss(serverSocket);
  var counter = 0;
  serverSocket.once('connect', function(){
    debug("consumer: "+name+" connected");
    if(listen) {
      serverStreamSocket.on("file",function(stream,data) {
        debug("consumer: on file");
        if(write) {
          stream.pipe(fs.createWriteStream(name+"-"+counter+"-"+data.name));
          counter++;
        }
      });
    }
  });
}

from socket.io-stream.

arlando avatar arlando commented on July 4, 2024

Had this issue today too but managed to work around:

var express = require('express');
var app = express();
var server = require('http').Server(app);
var io = require('socket.io')(server);
var ss = require("socket.io-stream");
var TextStream = require('./textstream');
var _textstream;
var consumers = {};

server.listen(8080);

app.use(express.static(__dirname + '/flood'));


io.on('connection', function (socket) {
    consumers[socket.handshake.query.t] = socket;
    addStreamToSockets();

    socket.on('disconnect', function () {
        delete consumers[socket.handshake.query.t];
    })
});

function makeOutgoingStream(consumer) {
    var outgoing = ss.createStream();

    consumer.OUTGOING = outgoing;
    ss(consumer).emit('script', outgoing);
    return outgoing;
}

function destroyOutgoingStream(consumer) {
    if (consumer.OUTGOING) {
        consumer.OUTGOING.destroy();
        consumer.OUTGOING = null;
    }
}

function addStreamToSockets() {
    _textstream = new TextStream();
    var consumerKeys = Object.keys(consumers);
    var numberOfConsumers = consumerKeys.length;

    for (var i = 0; i < numberOfConsumers; i++) {
        var key = consumerKeys[i];
        addStreamToSocket(consumers[key])
    }
}

function addStreamToSocket(consumer) {
    destroyOutgoingStream(consumer);
    _textstream.pipe(makeOutgoingStream(consumer))
}

The main difference @peteruithoven is that I had to destroy the previous stream on the socket before binding it also this just adds the socket to the new stream when a new connection is made.

  • For some reason, if I instantiate the stream and keep it around in scope it doesn't work (I think the 'readable' event is being fired and not 'data' event).
  • Another thing is you need to unbind the receiving socket from the old stream or you will keep adding streams that are piping to the client. I think if we keep working we may figure something elegant out.

Ideally I would want to instantiate the stream once and be able to add and remove clients at will.

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Why abort a ongoing stream for a new one? Aren't you afraid you're consumers will not be able to complete the streams because they are aborted, when the consumer is behind a bad internet connection for example.

I'm now trying to setup my system in such a way that a new stream isn't accepted when there is still a stream, that is being consumed. This also functions as a throttling system. The problem is that I need to listen to the internal read event and I'm not yet sure how to detect that something went wrong while sending the stream.

from socket.io-stream.

arlando avatar arlando commented on July 4, 2024

Mmm you are right, aborting an ongoing stream isn't ideal. Ideally the stream would stop, add the new listener, and then resume. I am piping a stream to an output X on listener X, but when listener Y joins output Y cannot join the ongoing the stream for some reason, unless I new the stream and do the steps above.

I am going to be streaming charting data ( x,y values) so some loss is ok. If I were streaming an image or a file then I could see why not verifying the stream was finished would be bad.

As for errors wouldn't this work: http://nodejs.org/api/stream.html#stream_event_error can't you bubble an event to the socket on error?

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

I'm afraid I don't understand your use case well enough to give much advice.
You can only pipe the stream to multiple consumers when you start piping at the same time.

Maybe I can use the error event, I've opened an issue to clarify some stuff:
#37 (comment)

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Using resume to put the stream into flowing mode works for me, so I'll close this issue.

from socket.io-stream.

waltergms avatar waltergms commented on July 4, 2024

How do i stop stream withou disconnect client socket? Im using the ss on a chat system and i cant disconect the client, i just have to give a option to the client cancel the upload before it ends....

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 4, 2024

Isn't this a separate question? Why not call the end() on the stream instance?

from socket.io-stream.

jenil04 avatar jenil04 commented on July 4, 2024

I'm trying to transfer and store files from one socket to another. The stored files are saved in a Merkle tree. How do I save this data stream into a merkle tree?

from socket.io-stream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.