Giter Site home page Giter Site logo

Stream managment about socket.io-stream HOT 4 CLOSED

nkzawa avatar nkzawa commented on July 24, 2024
Stream managment

from socket.io-stream.

Comments (4)

nkzawa avatar nkzawa commented on July 24, 2024

Thanks for sharing your solutions! But I couldn't see the repository, it's 404 :(

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 24, 2024

Sorry, i forgot it was a private repo. I should have really made it into a separate public module.
The code:

'use strict';
/*
 * StreamManager
 * Manage stream storage and optionally sharing streams over
 * multiple node.js instances using tcp sockets
 */
var net     = require('net');
var debug   = require('debug')('cloud:ws:StreamManager');
var async   = require('async');
module.exports = StreamManager;

function StreamManager() {
  if (!(this instanceof StreamManager)) return new StreamManager();

  var _self = this;
  var _streamsData = {};
  var _tcpServerPort;
  var _tcpServerHost;

  this.init = function(port, host) {
    _tcpServerPort = port;
    _tcpServerHost = host;
    startTCPServer();
  };

  this.add = function(id, stream, timeout) {

    var streamData = new StreamData(id, stream);
    var self = this;
    // remove possibly existing stream
    if (_streamsData[id] !== undefined) {
       self.remove(id);
    }

    // store new
    _streamsData[streamData.id] = streamData;
    debug('new stream: ', streamData.id, '(num streams: ', self.getNumStreams(), ')');

    stream.setMaxListeners(0);

    var orgRead = stream._read;
    stream._read = function() {
      debug('_read: ', id);
      return orgRead.apply(this, arguments);
    };
    var orgPush = stream.push;
    stream.push = function(chunk) {
      debug('push: ', id, chunk? chunk.length : chunk);
      if (!streamData.streaming && chunk !== null) {
        debug('streaming: ', id);
        streamData.streaming = true;
      }
      return orgPush.apply(this, arguments);
    };

    stream.on('end', function() {
      debug('end: ', id);
      self.remove(id);
    });
    stream.on('error', function(err) {
      debug('error: ', id, err);
      self.remove(id);
    });

    if(timeout > 0) {
      streamData.timeout = setTimeout(function() {
        debug('timeout: ',id);
        self.remove(id);
      },timeout);
    }
  };

  this.remove = function(id) {
    var streamData = _streamsData[id];
    if (streamData === undefined) return;
    debug('remove: ', id);
    streamData.stream.end();
    //stream.removeAllListeners();
    if(streamData.timeout !== undefined) clearTimeout(streamData.timeout);
    delete _streamsData[id];
    debug('removed stream: ', id, '(num streams: ', this.getNumStreams(), ')');
  };

  /**
   * Get stream
   * By suppling a tcpPort streams can be retrieved from other Node.js instances over a tcp socket.
   * @param {string} id
   * @param {number} [tcpPort]
   */
  this.get = function(id, tcpPort, callback) {
    if (typeof tcpPort === 'function') {
      callback = tcpPort;
      tcpPort = undefined;
    }
    //debug('get: ',id,tcpPort || '');
    var self = this;
    if (tcpPort === undefined || _tcpServerPort === tcpPort) {
      // retrieve locally
      var stream = _streamsData[id] ? _streamsData[id].stream : undefined;
      if (!stream) {
        callback(new Error('Stream \'' + id + '\' not found'));
      } else {
        var alreadyStreaming = self.isStreaming(id);
        if (alreadyStreaming) {
          callback(new Error('Stream \'' + id + '\' was already streaming'));
        } else {
          // Switch the stream into flowing-mode
          // this allows piping the one stream to multiple streams
          // it does mean clients will lose data if they don't
          // handle the stream right away.
          stream.resume();
          callback(null, stream);
        }
      }
    } else {
      // connect to other Node.js instance's tcp server (assuming same host)
      // FUTURE: allow other hosts
      getRemoteStream(id, tcpPort, _tcpServerHost, callback);
    }
  };

  function getRemoteStream(id, port, host, callback) {
    var client = new net.Socket();
    debug('TCP client: connecting to: ' + host + ':' + port);
    client.connect(port, host, function() {
      debug('TCP client: connected to: ' + port);
      // specifing the stream we're interested in
      client.write(JSON.stringify({read: id}));
      // wait for response
      client.once('data', function(data) {
        debug('TCP client: response: ' + data);
        try {
          data = JSON.parse(data);
        } catch(err) {
          return callback(err);
        }
        if (data.error !== undefined) {
          callback(new Error(data.error));
        } else {
          callback(null, client);
        }
      });
    });
    client.on('error', function(err) {
      debug('TCP client error: ', err);
    });
    client.on('end', function() {
      debug('TCP client ', port, 'end: ', id);
    });
  }

  this.isStreaming = function(id) {
    if (_streamsData[id] === undefined) return false;
    else return _streamsData[id].streaming;
  };

  this.getNumStreams = function() {
    return Object.keys(_streamsData).length;
  };

  function startTCPServer() {
    debug('start TCP server: ', _tcpServerHost, _tcpServerPort);
    // create tcp server to serve streams
    var server = net.createServer();
    // when there is a new tcp client connected
    server.on('connection', onTCPClient);
    server.on('error', function(err) {
      debug('TCP server: error: ', err);
    });
    server.listen(_tcpServerPort, _tcpServerHost, function() {
      debug('TCP Server: listening on ' + server.address().address + ':' + server.address().port);
    });
  }

  function onTCPClient(sock) {
    debug('TCP server: new connection: ' + sock.remoteAddress + ':' + sock.remotePort);
    // listen for some data once
    sock.once('data', function(data) {
      debug('TCP server: ' + sock.remoteAddress + ':' + sock.remotePort + ' first data: ' + data);
      var streamID;
      var stream;
      async.series([
        // parse json
        function(next) {
          try {
            data = JSON.parse(data);
          } catch(err) {
            return next(err);
          }
          if (data.read === undefined) {
            return next(new Error('No read specified'));
          }
          streamID = data.read;
          next();
        },
        // retrieve stream
        function(next) {
          _self.get(streamID, function(err, retrievedStream) {
            stream = retrievedStream;
            next(err);
          });
        }
      ], function(err) {
        debug('TCP server: retrieving stream ', streamID, ' finished: ', err || '');
        if (err) {
          sock.write(JSON.stringify({error: err.message}));
          sock.end();
        } else {
          sock.write(JSON.stringify({sending: streamID}));
          // pipe stored stream to tcp client
          // Switch the stream into flowing-mode
          // this allows piping the one stream to multiple streams
          // it does mean clients will lose data if they don't
          // handle the stream right away.
          debug('resume');
          stream.resume();
          stream.pipe(sock);
          stream.on('end', function() {
            debug('TCP server: stream ', streamID, 'end');
          });
        }
      });
    });
    sock.on('error', function(err) {
      debug('TCP connection error: ', err);
    });
  }

  function StreamData(id, stream) {
    this.id = id;
    this.stream = stream;
    this.streaming = false;
    this.timeout;
  }
}

from socket.io-stream.

nkzawa avatar nkzawa commented on July 24, 2024

I think limit the number of streams and timeout are awesome 🏆 even if it's not offered as API. Without that, malicious users can easily send a lot of streams if they want :(

from socket.io-stream.

peteruithoven avatar peteruithoven commented on July 24, 2024

Thanks. I don't think this should be a part of socker.io-stream itself, but maybe someone can turn it into a separate module, also because it isn't even socket.io-stream specific. I'll close this issue, this was just a way to share some solutions.

from socket.io-stream.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.