Comments (4)
Thanks for sharing your solutions! But I couldn't see the repository, it's 404 :(
from socket.io-stream.
Sorry, i forgot it was a private repo. I should have really made it into a separate public module.
The code:
'use strict';
/*
* StreamManager
* Manage stream storage and optionally sharing streams over
* multiple node.js instances using tcp sockets
*/
var net = require('net');
var debug = require('debug')('cloud:ws:StreamManager');
var async = require('async');
module.exports = StreamManager;
function StreamManager() {
if (!(this instanceof StreamManager)) return new StreamManager();
var _self = this;
var _streamsData = {};
var _tcpServerPort;
var _tcpServerHost;
this.init = function(port, host) {
_tcpServerPort = port;
_tcpServerHost = host;
startTCPServer();
};
this.add = function(id, stream, timeout) {
var streamData = new StreamData(id, stream);
var self = this;
// remove possibly existing stream
if (_streamsData[id] !== undefined) {
self.remove(id);
}
// store new
_streamsData[streamData.id] = streamData;
debug('new stream: ', streamData.id, '(num streams: ', self.getNumStreams(), ')');
stream.setMaxListeners(0);
var orgRead = stream._read;
stream._read = function() {
debug('_read: ', id);
return orgRead.apply(this, arguments);
};
var orgPush = stream.push;
stream.push = function(chunk) {
debug('push: ', id, chunk? chunk.length : chunk);
if (!streamData.streaming && chunk !== null) {
debug('streaming: ', id);
streamData.streaming = true;
}
return orgPush.apply(this, arguments);
};
stream.on('end', function() {
debug('end: ', id);
self.remove(id);
});
stream.on('error', function(err) {
debug('error: ', id, err);
self.remove(id);
});
if(timeout > 0) {
streamData.timeout = setTimeout(function() {
debug('timeout: ',id);
self.remove(id);
},timeout);
}
};
this.remove = function(id) {
var streamData = _streamsData[id];
if (streamData === undefined) return;
debug('remove: ', id);
streamData.stream.end();
//stream.removeAllListeners();
if(streamData.timeout !== undefined) clearTimeout(streamData.timeout);
delete _streamsData[id];
debug('removed stream: ', id, '(num streams: ', this.getNumStreams(), ')');
};
/**
* Get stream
* By suppling a tcpPort streams can be retrieved from other Node.js instances over a tcp socket.
* @param {string} id
* @param {number} [tcpPort]
*/
this.get = function(id, tcpPort, callback) {
if (typeof tcpPort === 'function') {
callback = tcpPort;
tcpPort = undefined;
}
//debug('get: ',id,tcpPort || '');
var self = this;
if (tcpPort === undefined || _tcpServerPort === tcpPort) {
// retrieve locally
var stream = _streamsData[id] ? _streamsData[id].stream : undefined;
if (!stream) {
callback(new Error('Stream \'' + id + '\' not found'));
} else {
var alreadyStreaming = self.isStreaming(id);
if (alreadyStreaming) {
callback(new Error('Stream \'' + id + '\' was already streaming'));
} else {
// Switch the stream into flowing-mode
// this allows piping the one stream to multiple streams
// it does mean clients will lose data if they don't
// handle the stream right away.
stream.resume();
callback(null, stream);
}
}
} else {
// connect to other Node.js instance's tcp server (assuming same host)
// FUTURE: allow other hosts
getRemoteStream(id, tcpPort, _tcpServerHost, callback);
}
};
function getRemoteStream(id, port, host, callback) {
var client = new net.Socket();
debug('TCP client: connecting to: ' + host + ':' + port);
client.connect(port, host, function() {
debug('TCP client: connected to: ' + port);
// specifing the stream we're interested in
client.write(JSON.stringify({read: id}));
// wait for response
client.once('data', function(data) {
debug('TCP client: response: ' + data);
try {
data = JSON.parse(data);
} catch(err) {
return callback(err);
}
if (data.error !== undefined) {
callback(new Error(data.error));
} else {
callback(null, client);
}
});
});
client.on('error', function(err) {
debug('TCP client error: ', err);
});
client.on('end', function() {
debug('TCP client ', port, 'end: ', id);
});
}
this.isStreaming = function(id) {
if (_streamsData[id] === undefined) return false;
else return _streamsData[id].streaming;
};
this.getNumStreams = function() {
return Object.keys(_streamsData).length;
};
function startTCPServer() {
debug('start TCP server: ', _tcpServerHost, _tcpServerPort);
// create tcp server to serve streams
var server = net.createServer();
// when there is a new tcp client connected
server.on('connection', onTCPClient);
server.on('error', function(err) {
debug('TCP server: error: ', err);
});
server.listen(_tcpServerPort, _tcpServerHost, function() {
debug('TCP Server: listening on ' + server.address().address + ':' + server.address().port);
});
}
function onTCPClient(sock) {
debug('TCP server: new connection: ' + sock.remoteAddress + ':' + sock.remotePort);
// listen for some data once
sock.once('data', function(data) {
debug('TCP server: ' + sock.remoteAddress + ':' + sock.remotePort + ' first data: ' + data);
var streamID;
var stream;
async.series([
// parse json
function(next) {
try {
data = JSON.parse(data);
} catch(err) {
return next(err);
}
if (data.read === undefined) {
return next(new Error('No read specified'));
}
streamID = data.read;
next();
},
// retrieve stream
function(next) {
_self.get(streamID, function(err, retrievedStream) {
stream = retrievedStream;
next(err);
});
}
], function(err) {
debug('TCP server: retrieving stream ', streamID, ' finished: ', err || '');
if (err) {
sock.write(JSON.stringify({error: err.message}));
sock.end();
} else {
sock.write(JSON.stringify({sending: streamID}));
// pipe stored stream to tcp client
// Switch the stream into flowing-mode
// this allows piping the one stream to multiple streams
// it does mean clients will lose data if they don't
// handle the stream right away.
debug('resume');
stream.resume();
stream.pipe(sock);
stream.on('end', function() {
debug('TCP server: stream ', streamID, 'end');
});
}
});
});
sock.on('error', function(err) {
debug('TCP connection error: ', err);
});
}
function StreamData(id, stream) {
this.id = id;
this.stream = stream;
this.streaming = false;
this.timeout;
}
}
from socket.io-stream.
I think limit the number of streams and timeout are awesome 🏆 even if it's not offered as API. Without that, malicious users can easily send a lot of streams if they want :(
from socket.io-stream.
Thanks. I don't think this should be a part of socker.io-stream itself, but maybe someone can turn it into a separate module, also because it isn't even socket.io-stream specific. I'll close this issue, this was just a way to share some solutions.
from socket.io-stream.
Related Issues (20)
- Is this project still active? HOT 7
- Streaming to Android
- server > client HOT 1
- Simpler API
- MediaStream HOT 11
- Can you upload a folder?
- events.js:62 Uncaught Error: Uncaught, unspecified "error" event. (Authentication error) HOT 1
- Able to send/receive a native browser stream HOT 1
- An error when using Socket.io-stream with Electron.
- A port to other languages
- Stream interrupted file
- Uncaught TypeError: Cannot read property '_read' of null HOT 3
- Server dont recive emit or client dont emit HOT 3
- Send audio file from node.js server to another node.js server
- Client iOS
- This version of debug is vulnerable to ReDos attacks HOT 3
- In browser console getting error
- [Deprecation] SharedArrayBuffer
- add a settimeout to it to reduce cpu consumption
- Unfortunately there is no support for rooms and broadcasting for now.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from socket.io-stream.