Comments (17)
The same issue can be reproduces without socket.io-stream by making a writable stream that doesn't write.
var debug = require('debug')('streams');
var fs = require('fs');
var filename = "image.jpg";
var readStream = fs.createReadStream("assets/"+filename);
// one working stream that should write to file
readStream.pipe(fs.createWriteStream(0+"-"+filename));
// one fake stream that doesn't do anthing
var nonWritingStream = new require('stream').Writable();
nonWritingStream._write = function(chunk, encoding, callback) {
debug("write");
// callback(); // calling this callback also enables the other write stream
};
readStream.pipe(nonWritingStream);
from socket.io-stream.
Is there a way to detect that a client started handling the stream? I think that could solve my problem.
I tried adding the pipe after I received the internal $stream-read event (from the original socket.io socket) (only the first time). But this breaks streams to all but the first client. It looks like they miss the first (or a few more) chunks.
from socket.io-stream.
Basically, you can't reuse piped stream:
from socket.io-stream.
But it looks like you can have parallel consuming streams, just as long as you pipe the stream to them at the same time.
I'll try to wait a cycle (nextTick) after the first read event and then start piping to all the socket's that I received a read event form.
Another thing I'll try is to collect the 'customers' after they started reading, but then they might miss the first emit.
from socket.io-stream.
I'm not sure, but how about to copy data to other streams manually without pipe
method?
incoming.on('data', function(chunk) {
outgoing1.write(chunk);
outgoing2.write(chunk);
});
incoming.resume();
from socket.io-stream.
Well the main problem is that the clients don't start reading in parallel, there will always be a unpredictable delay in a client receiving a event and reading to the stream (if they do). The following construction seems to work, but is quite ugly.
var socketClient = require('socket.io-client');
var debug = require('debug')('streams');
var ss = require("socket.io-stream");
var fs = require('fs');
var PORT = 5000;
var URL = "http://localhost:"+PORT;
server();
consumer("a",true,true);
consumer("b",true,false);
consumer("c",false,false);
// SERVER
function server() {
var http = require('http').Server();
var io = require('socket.io')(http);
http.listen(PORT, function(){
debug('server listening on *:' + PORT);
});
var nsp = io.of('/');
var consumers = [];
nsp.on('connection', function(socket) {
var query = socket.handshake.query;
debug("server: new connection: ",query.name);
var streamingSocket = ss(socket);
consumers.push(streamingSocket);
socket.once('$stream-read',function() {
debug(' $stream-read ',arguments);
streamingSocket.reading = true;
});
});
// after a arbitrary delay the server sends an image arround
setTimeout(function() {
debug("broadcast image");
var filename = "image.jpg";
var readStream = fs.createReadStream("assets/"+filename);
consumers.forEach(function(consumer,index) {
debug(" consumer: ",index);
var outgoingStream = ss.createStream();
consumer.emit('file',outgoingStream,{name:filename});
debug(" consumer.reading: ",consumer.reading);
// a delay so that all consumers have a change to start reading
setTimeout(function() {
debug(" >consumer.reading: ",consumer.reading);
if(consumer.reading) {
readStream.pipe(outgoingStream);
}
},500);
});
},500);
}
// CLIENTS
function consumer(name,listen,write) {
var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
var serverStreamSocket = ss(serverSocket);
serverSocket.once('connect', function(){
debug("consumer "+name+" connected");
if(listen) {
serverStreamSocket.on("file",function(stream,data) {
debug("consumer on file");
if(write) {
stream.pipe(fs.createWriteStream(name+"-"+data.name));
}
});
}
});
}
A compromise might be that all but the first connecting client will receive a partial first event.
Or maybe there is a way to send a "probe" stream, to check who's reading.
from socket.io-stream.
So the following also works, but then the first emit always results in a 0 byte file.
var socketClient = require('socket.io-client');
var debug = require('debug')('streams');
var ss = require("socket.io-stream");
var fs = require('fs');
var PORT = 5000;
var URL = "http://localhost:"+PORT;
server();
consumer("a",true,true);
consumer("b",true,true);
consumer("c",false,false);
// SERVER
function server() {
var http = require('http').Server();
var io = require('socket.io')(http);
http.listen(PORT, function(){
debug('server listening on *:' + PORT);
});
var nsp = io.of('/');
var consumers = [];
nsp.on('connection', function(socket) {
var query = socket.handshake.query;
debug("server: new connection: ",query.name);
var streamingSocket = ss(socket);
consumers.push(streamingSocket);
socket.once('$stream-read',function() {
debug(' $stream-read ',arguments);
streamingSocket.reading = true;
});
});
// after a arbitrary delay the server sends an image arround
setInterval(function() {
debug("broadcast image");
var filename = "image.jpg";
var readStream = fs.createReadStream("assets/"+filename);
consumers.forEach(function(consumer,index) {
debug(" consumer: ",index);
var outgoingStream = ss.createStream();
consumer.emit('file',outgoingStream,{name:filename});
debug(" consumer.reading: ",consumer.reading);
if(consumer.reading) {
readStream.pipe(outgoingStream);
}
});
},2000);
}
// CLIENTS
function consumer(name,listen,write) {
var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
var serverStreamSocket = ss(serverSocket);
var counter = 0;
serverSocket.once('connect', function(){
debug("consumer "+name+" connected");
if(listen) {
serverStreamSocket.on("file",function(stream,data) {
debug("consumer on file");
if(write) {
stream.pipe(fs.createWriteStream(name+"-"+counter+"-"+data.name));
counter++;
}
});
}
});
}
from socket.io-stream.
Another slightly better solution; unpiping the ones that don't seem to listen.
var socketClient = require('socket.io-client');
var debug = require('debug')('streams');
var ss = require("socket.io-stream");
var fs = require('fs');
var PORT = 5000;
var URL = "http://localhost:"+PORT;
var STREAM_EVENT = ss.Socket.event;
server();
consumer("a",true,true);
consumer("b",false,false);
setTimeout(function() {
consumer("c",true,true);
setTimeout(function() {
consumer("d",true,false);
},3000);
},3000);
// SERVER
function server() {
var http = require('http').Server();
var io = require('socket.io')(http);
http.listen(PORT, function(){
debug('server listening on *:' + PORT);
});
var nsp = io.of('/');
var consumers = [];
nsp.on('connection', function(socket) {
var query = socket.handshake.query;
debug("server: new connection: ",query.name);
//var streamingSocket = ss(socket);
consumers.push(socket);
});
// at a arbitrary interval the server sends an image arround
setInterval(function() {
debug("broadcast image");
var filename = "image.jpg";
var readStream = fs.createReadStream("assets/"+filename);
consumers.forEach(function(consumer,index) {
debug(" pipe to consumer: ",index);
var outgoingStream = ss.createStream();
ss(consumer).emit('file',outgoingStream,{name:filename});
readStream.pipe(outgoingStream);
var reading = false;
consumer.once(STREAM_EVENT+'-read',function() {
debug(index+': $stream-read ');
reading = true;
});
setTimeout(function() {
debug(index+": reading: ",reading);
if(!reading) {
readStream.unpipe(outgoingStream);
debug(" unpipe to consumer: ",index);
}
},100);
});
},2000);
}
// CLIENTS
function consumer(name,listen,write) {
var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
var serverStreamSocket = ss(serverSocket);
var counter = 0;
serverSocket.once('connect', function(){
debug("consumer: "+name+" connected");
if(listen) {
serverStreamSocket.on("file",function(stream,data) {
debug("consumer: on file");
if(write) {
stream.pipe(fs.createWriteStream(name+"-"+counter+"-"+data.name));
counter++;
}
});
}
});
}
from socket.io-stream.
Turning the readable stream into flowing-mode by calling resume() or by adding a data event listener also seems to fix the problem. This seems like the best solution.
Readable streams have two "modes": a flowing mode and a non-flowing mode. When in flowing mode, data is read from the underlying system and provided to your program as fast as possible. In non-flowing mode, you must explicitly call stream.read() to get chunks of data out.
Example:
var socketClient = require('socket.io-client');
var debug = require('debug')('streams');
var ss = require("socket.io-stream");
var fs = require('fs');
var PORT = 5000;
var URL = "http://localhost:"+PORT;
server();
consumer("a",true,true);
consumer("b",false,false);
consumer("c",true,true);
setTimeout(function() {
consumer("d",true,true);
setTimeout(function() {
consumer("e",true,false);
},3000);
},3000);
// SERVER
function server() {
var http = require('http').Server();
var io = require('socket.io')(http);
http.listen(PORT, function(){
debug('server listening on *:' + PORT);
});
var nsp = io.of('/');
var consumers = [];
nsp.on('connection', function(socket) {
var query = socket.handshake.query;
debug("server: new connection: ",query.name);
//var streamingSocket = ss(socket);
consumers.push(socket);
});
// at an arbitrary interval the server sends an image arround
setInterval(function() {
debug("broadcast image");
var filename = "image.jpg";
var readStream = fs.createReadStream("assets/"+filename);
readStream.resume(); // switch the stream into flowing-mode
consumers.forEach(function(consumer,index) {
debug(" pipe to consumer: ",index);
var outgoingStream = ss.createStream();
ss(consumer).emit('file',outgoingStream,{name:filename});
readStream.pipe(outgoingStream);
});
},2000);
}
// CLIENTS
function consumer(name,listen,write) {
var serverSocket = socketClient(URL+"?name="+name,{forceNew:true});
var serverStreamSocket = ss(serverSocket);
var counter = 0;
serverSocket.once('connect', function(){
debug("consumer: "+name+" connected");
if(listen) {
serverStreamSocket.on("file",function(stream,data) {
debug("consumer: on file");
if(write) {
stream.pipe(fs.createWriteStream(name+"-"+counter+"-"+data.name));
counter++;
}
});
}
});
}
from socket.io-stream.
Had this issue today too but managed to work around:
var express = require('express');
var app = express();
var server = require('http').Server(app);
var io = require('socket.io')(server);
var ss = require("socket.io-stream");
var TextStream = require('./textstream');
var _textstream;
var consumers = {};
server.listen(8080);
app.use(express.static(__dirname + '/flood'));
io.on('connection', function (socket) {
consumers[socket.handshake.query.t] = socket;
addStreamToSockets();
socket.on('disconnect', function () {
delete consumers[socket.handshake.query.t];
})
});
function makeOutgoingStream(consumer) {
var outgoing = ss.createStream();
consumer.OUTGOING = outgoing;
ss(consumer).emit('script', outgoing);
return outgoing;
}
function destroyOutgoingStream(consumer) {
if (consumer.OUTGOING) {
consumer.OUTGOING.destroy();
consumer.OUTGOING = null;
}
}
function addStreamToSockets() {
_textstream = new TextStream();
var consumerKeys = Object.keys(consumers);
var numberOfConsumers = consumerKeys.length;
for (var i = 0; i < numberOfConsumers; i++) {
var key = consumerKeys[i];
addStreamToSocket(consumers[key])
}
}
function addStreamToSocket(consumer) {
destroyOutgoingStream(consumer);
_textstream.pipe(makeOutgoingStream(consumer))
}
The main difference @peteruithoven is that I had to destroy the previous stream on the socket before binding it also this just adds the socket to the new stream when a new connection is made.
- For some reason, if I instantiate the stream and keep it around in scope it doesn't work (I think the 'readable' event is being fired and not 'data' event).
- Another thing is you need to unbind the receiving socket from the old stream or you will keep adding streams that are piping to the client. I think if we keep working we may figure something elegant out.
Ideally I would want to instantiate the stream once and be able to add and remove clients at will.
from socket.io-stream.
Why abort a ongoing stream for a new one? Aren't you afraid you're consumers will not be able to complete the streams because they are aborted, when the consumer is behind a bad internet connection for example.
I'm now trying to setup my system in such a way that a new stream isn't accepted when there is still a stream, that is being consumed. This also functions as a throttling system. The problem is that I need to listen to the internal read event and I'm not yet sure how to detect that something went wrong while sending the stream.
from socket.io-stream.
Mmm you are right, aborting an ongoing stream isn't ideal. Ideally the stream would stop, add the new listener, and then resume. I am piping a stream to an output X on listener X, but when listener Y joins output Y cannot join the ongoing the stream for some reason, unless I new the stream and do the steps above.
I am going to be streaming charting data ( x,y values) so some loss is ok. If I were streaming an image or a file then I could see why not verifying the stream was finished would be bad.
As for errors wouldn't this work: http://nodejs.org/api/stream.html#stream_event_error can't you bubble an event to the socket on error?
from socket.io-stream.
I'm afraid I don't understand your use case well enough to give much advice.
You can only pipe the stream to multiple consumers when you start piping at the same time.
Maybe I can use the error event, I've opened an issue to clarify some stuff:
#37 (comment)
from socket.io-stream.
Using resume to put the stream into flowing mode works for me, so I'll close this issue.
from socket.io-stream.
How do i stop stream withou disconnect client socket? Im using the ss on a chat system and i cant disconect the client, i just have to give a option to the client cancel the upload before it ends....
from socket.io-stream.
Isn't this a separate question? Why not call the end()
on the stream instance?
from socket.io-stream.
I'm trying to transfer and store files from one socket to another. The stored files are saved in a Merkle tree. How do I save this data stream into a merkle tree?
from socket.io-stream.
Related Issues (20)
- Is this project still active? HOT 7
- Streaming to Android
- server > client HOT 1
- Simpler API
- MediaStream HOT 11
- Can you upload a folder?
- events.js:62 Uncaught Error: Uncaught, unspecified "error" event. (Authentication error) HOT 1
- Able to send/receive a native browser stream HOT 1
- An error when using Socket.io-stream with Electron.
- A port to other languages
- Stream interrupted file
- Uncaught TypeError: Cannot read property '_read' of null HOT 3
- Server dont recive emit or client dont emit HOT 3
- Send audio file from node.js server to another node.js server
- Client iOS
- This version of debug is vulnerable to ReDos attacks HOT 3
- In browser console getting error
- [Deprecation] SharedArrayBuffer
- add a settimeout to it to reduce cpu consumption
- Unfortunately there is no support for rooms and broadcasting for now.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from socket.io-stream.