Giter Site home page Giter Site logo

mqtt.js's People

Contributors

4rzael avatar anhldbk avatar anjz avatar behrad avatar bertkleewein avatar boristian avatar gkwicker avatar itavy avatar jeloou avatar johnnyman727 avatar kmpm avatar manu3756 avatar mcollina avatar mocheng avatar ogis-fujiwara avatar ogis-yamazaki avatar prabathabey avatar qm3ster avatar radekg avatar ralphtheninja avatar rangermauve avatar redboltz avatar robertslando avatar roccomuso avatar samirnaik avatar scarry1992 avatar sublimator avatar wcatron avatar wolfeidau avatar yodama avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mqtt.js's Issues

generate.js generates client ID that have a large potential for collision

In generate.js, the defaultClientID being generated has a high potential for collision with other clients using the same library: https://github.com/adamvr/MQTT.js/blob/master/lib/generate.js#L11

It is totally possible for several client machines in a distributed system to start up with the same PID and to collide. They would then share the same client ID.

A better solution would be to use a combination of UTC timestamp and some random/static string as a default client ID to reduce the risk of collision.

Auto-reconnecting client

An useful functionality to add to the Client might be handling the auto-reconnection.
It might also resend all of the non-acked messages on reconnection.
Moreover, it should also support sending messages while offline (up to a certain point).
This is very similar to how the 'reds' package work.

Is this feasible in the context of this package?

Not available in npm?

Attempting to npm install mqttjs...

npm http GET https://registry.npmjs.org/mqttjs
npm http 200 https://registry.npmjs.org/mqttjs
npm ERR! Error: No compatible version found: mqttjs
npm ERR! No valid targets found.
npm ERR! Perhaps not compatible with your version of node?
npm ERR!     at installTargetsError (/usr/local/lib/node_modules/npm/lib/cache.js:506:10)
npm ERR!     at next_ (/usr/local/lib/node_modules/npm/lib/cache.js:452:17)
npm ERR!     at next (/usr/local/lib/node_modules/npm/lib/cache.js:427:44)
npm ERR!     at /usr/local/lib/node_modules/npm/lib/cache.js:419:5
npm ERR!     at saved (/usr/local/lib/node_modules/npm/node_modules/npm-registry-client/lib/get.js:136:7)
npm ERR!     at /usr/local/lib/node_modules/npm/node_modules/graceful-fs/graceful-fs.js:230:7
npm ERR!     at Object.oncomplete (fs.js:297:15)
npm ERR!  [Error: No compatible version found: mqttjs
npm ERR! No valid targets found.
npm ERR! Perhaps not compatible with your version of node?]
npm ERR! You may report this log at:
npm ERR!     <http://github.com/isaacs/npm/issues>
npm ERR! or email it to:
npm ERR!     <[email protected]>

npm ERR! System Darwin 11.4.0
npm ERR! command "node" "/usr/local/bin/npm" "install" "mqttjs"
npm ERR! cwd /Users/andyp
npm ERR! node -v v0.8.0
npm ERR!     npm -v 1.1.32
npm ERR! message No compatible version found: mqttjs
npm ERR! message No valid targets found.
npm ERR! message Perhaps not compatible with your version of node?
npm ERR! 
npm ERR! Additional logging details can be found in:
npm ERR!     /Users/andyp/npm-debug.log
npm ERR! not ok code 0

mqtt mobile solution discussion and some suggestion about current MQTT.js implementation

Hi guys, I did use the version 0.2.4 MQTT.js in my research and find that the connection.js (in version 0.2.4 which is using Buffer to handle socket reading) works improperly when handle big data(which cannot be done in one shot). I tried to modify it without changing the original mechanism, and below is what I got.

connection.js in matt\lib

var events = require('events')
  , util = require('util')
  , protocol = require('./protocol')
  , generate = require('./generate')
  , parse = require('./parse');

var Connection = module.exports = 
function Connection(stream, server) {
  this.server = server;
  this.stream = stream;
  //buffer used to record unread data
  this.buffer = null;
  this.packet = {};
  var that = this;
  this.stream.on('data', function (buf) {
    that.parse(buf);
  });
  events.EventEmitter.call(this);
};
util.inherits(Connection, events.EventEmitter);

Connection.prototype.parse = function(buf) {
    if(null !== this.buffer){
        //if there's unread buffer from last call,
        //add to the current buf
        buf = Buffer.concat([this.buffer, buf], 
                this.buffer.length + buf.length);

    }

    var pos = 0,                //next read position
    len = buf.length;           //total buf length

    if('undefined' !== typeof buf[0]){
        if(!this.packet.cmd){
            //parse the header
            parse['header'](buf.slice(0,1), this.packet);
            pos++;
        }
        if(!this.packet.length){
            //parse the remaining length
            var tmp = {mul: 1, length: 0};
            var _pos = pos;         
                        //trying to read remaining length, record the read position
                //may fail if lack of available data, 
            //so use `_pos` instead of manipulating directly on `pos`
            do{
                if(_pos >= len){
                    //not enough data 
                    //record the unread buffer 
                    this.buffer = buf.slice(pos, len);
                    return;
                }
                tmp.length += 
                    tmp.mul *(buf[_pos] & protocol.LENGTH_MASK);
                tmp.mul *= 0x80;
            }while(
                (buf[_pos++] & protocol.LENGTH_FIN_MASK) !== 0
            );
                //successfully read the remaining length
                //record and set `pos` to the right position
            this.packet.length = tmp.length;
            pos = _pos;
        }
        if(len - pos < this.packet.length){
            //not enough data for variable header & payload
            //record the unread buffer, then return
            this.buffer = buf.slice(pos, len);
            return;
        }else{
            //enough
            parse[this.packet.cmd](
                buf.slice(pos, this.packet.length + pos), 
                this.packet
            );
            //renew the read position
            pos += this.packet.length;
            //emit event
            this.emit(this.packet.cmd, this.packet);
            //reset packet
            this.packet = {};
                }

        if(pos < len){
            //still have some data, 
            //must belong to the next mqtt message
            //record in buffer
            this.buffer = buf.slice(pos, len);
        }else{
            //no data left
            //clear buffer
            this.buffer = null;
        }
    }
};

did some test and seems work ideally. Hope it helps.

Then, today I see mcollina modified the connection.js with readable-steam. I tried to understand and I think the part parsing the remaining length is hard to be understood, so I tried to write it myself and got below stuff.

in Connection 's constructor, add a new property called remLen as below:
this.remLen = []; //used to record remaining length digits

then in Connection#parse()

...//content above
if(!this.packet.length){
  do{
    //try read one byte
    byte = this.stream.read(1);

    if(byte === null){
      //unsuccessful
      //wait for next data
      return;
    }else{
      //succeeded
      //save it and read next
     this.remLen.push(byte[0]);    
    }
    //do read until the end of `remaing length`
  }while((byte[0] & 0x80) !== 0);

  //to record the length
  var _length = 0;
  //handle `remaining length` digits
  for(var i=0;i < this.remLen.length;i++){
    //for i = 0,1,2,4; mul = 1,128, 128*128, 128*128*128;
    var multiplier = Math.pow(0x80, i);
    _length += (this.remLen[i] & 0x7f)*multiplier; 
  }
  this.packet.length = _length;
}
...//rest

untested. but maybe a little easier to be understood.

Finally, I want to discuss mqtt mobile solution with you guys, hopefully get some inspiration. My project is to do a push notification system for server-to-mobile communication. The mobile network circumstance here is quite under-developed. Most people here still use 2G and the network of the mobile can be very unstable while moving. Considering this, I'm wondering that if UDP would be a better choice compared with TCP, since it is really hard to maintain a long time connection. Another question is, under frequent transports, which one would cost less power. Any advice is appreciated.

correct handling of keep-alive

As stated in the MQTT website (link), the client MUST send a pingreq before the keep-alive period is completed, otherwise the server can drop the connection.

Where the timer can be inserted?
The important thing is that the timer must be delated after the connection is closed.

Implement offline messages, a la node_redis

From #42.

For instance:

var client = require('mqtt').createClient();
client.subscribe('test');
client.publish('test', 'test');
client.on('message', console.log);
// some time later
client.on('connect', function(){
  console.log('connected');}
);

client subscription - JSON as payload received

This is the JSON I receive as a payload:

{
   "keyone":"value",
   "keytwo":"value"
}

Now, I try and parse the JSON (all failed attempts):

client.on('publish', function (packet) {
  console.log('Received: ' + new String(packet.payload)); // OK

  var json = JSON.parse(JSON.stringify(packet.payload));  
  console.log('key1: ' + json.keyone)' // undefined
});
client.on('publish', function (packet) {
  console.log('Received: ' + new String(packet.payload)); // OK

  var json = JSON.parse(packet.payload);  
  console.log('key1: ' + json.keyone)' // undefined
});
client.on('publish', function (packet) {
  console.log('Received: ' + new String(packet.payload)); // OK

  var json = JSON.parse(packet.payload.toString());  
  console.log('key1: ' + json.keyone)' // undefined
});

Not sure what I'm doing wrong here, but when I subscribe on a topic and the client receives any JSON string as the payload (I've validated the JSON that prints out), I can't seem to figure out what the hell I'm given back. I've tried multiple ways of doing this (see above) but nothing seems to work. What am I missing?! Please help :)

messageId and qos, where are they?

I'm using your package to do the broker and client, in the client I have:

client.publish({ qos: 1, messageId: 1, topic: "test_topic", payload: JSON.stringify({client: client_id, test: true}) });

and in the broker this is what I get, for ( var key in packet ):

Client mqtt_node_4238 connected.
-----------> cmd :: publish
-----------> retain :: false
-----------> qos :: 0
-----------> dup :: false
-----------> length :: 59
-----------> topic :: test_topic
-----------> payload :: {"client":"mqtt_node_4238","test":true}
Sending puback message to client mqtt_node_4238 for message undefined(QOS:0)

I can't see a messageId and my qos is always 0. How come? This is node v0.6.18.

Topics and uniqueness in orig.js example

There seems to be an issue with the regular expression that matches topics in the orig.js example. The problem arises if you subscribe to a topic: /system/app

Then it will also match /system/appp.

Module now requires Node >= v0.7.11

The recent changes to use Buffer.concat (generate.js) break compatibility with Node v0.6.x (concat was added in 0.7.11). I think there's probably a way to fall back to one of the modules that implement this for older Node versions but I haven't made time to look into it. If nobody cares about 0.6, the resolution to this issue is likely just to update the package.json engines requirement.

Thanks for the very helpful module, Adam and contributors.

Compatibility with <0.7.11 due to readable-stream's dependency on Buffer.concat

  1. MqttServer should emit MqttServerClients:
    TypeError: Object function Buffer(subject, encoding, offset) {
    if (!(this instanceof Buffer)) {
    return new Buffer(subject, encoding, offset);
    }
    var type;
    // Are we slicing?
    if (typeof offset === 'number') {
    this.length = coerce(encoding);
    this.parent = subject;
    this.offset = offset;
    } else {
    // Find the length
    switch (type = typeof subject) {
    case 'number':
    this.length = coerce(subject);
    break;
    case 'string':
    this.length = Buffer.byteLength(subject, encoding);
    break;
    case 'object': // Assume object is an array
    this.length = coerce(subject.length);
    break;
    default:
    throw new Error('First argument needs to be a number, ' +
    'array or string.');
    }
    if (this.length > Buffer.poolSize) {
    // Big buffer, just alloc one.
    this.parent = new SlowBuffer(this.length);
    this.offset = 0;
    } else {
    // Small buffer.
    if (!pool || pool.length - pool.used < this.length) allocPool();
    this.parent = pool;
    this.offset = pool.used;
    pool.used += this.length;
    }
    // Treat array-ish objects as a byte array.
    if (isArrayIsh(subject)) {
    for (var i = 0; i < this.length; i++) {
    this.parent[i + this.offset] = subject[i];
    }
    } else if (type == 'string') {
    // We are a string
    this.length = this.write(subject, 0, encoding);
    }
    }
    SlowBuffer.makeFastBuffer(this.parent, this, this.offset, this.length);
    } has no method 'concat'
    at fromList (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/_stream_readable.js:816:20)
    at Readable.read (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/stream_readable.js:308:11)
    at MqttServerClient.parse (/home/adam/projects/MQTT.js/lib/connection.js:75:31)
    at Readable. (/home/adam/projects/MQTT.js/lib/connection.js:28:10)
    at Readable.emit (events.js:64:17)
    at emitReadable
    (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/_stream_readable.js:385:10)
    at emitReadable (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/_stream_readable.js:380:5)
    at readableAddChunk (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/_stream_readable.js:145:7)
    at Readable.push (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/_stream_readable.js:115:10)
    at Socket. (/home/adam/projects/MQTT.js/node_modules/readable-stream/lib/_stream_readable.js:755:20)
    at Socket.emit (events.js:67:17)
    at TCP.onread (net.js:367:14)

'close' event not sent by MqttConnection

Hi,
the 'close' event is currently not sent by MqttConnection (not that it is documented, but it's in the README.md Server API example).

I was going to send a pull request that makes MqttConnection echo the 'close' event from the stream but I'm not sure if the original intent was to expose the underlying stream of MqttConnection in which case only the readme example and MqttConnection documentation should be updated.

Add retained status when emiting message

Before the update, it was possible to access a message's retained status. With the new code, just the topic and message of a received packet is exposed in the message event.

In some situations it is very beneficial to have this kind of information. Could we expose such kind of information again?

Client must use the QoS assigned by the broker

At the moment the client ask the broker a subscription with a QoS.
The broker then decides what QoS assign (0, 1 or 2) and returns it inside a suback packet.
That is the 'real' QoS.

Moreover the 'user' is not notified, the given QoS is not passed to the callback.

packet error

When I test the mqtt server, a error occur after ping req

--------------pingreq
4 Oct 02:53:31 - Pinging
4 Oct 02:53:31 - Ping packet: <Buffer d0 00>
4 Oct 03:06:31 - Data received from client at 127.0.0.1
4 Oct 03:06:31 - <Buffer 30 22 00 10 74 6f 6b 75 64 75 2f 6b 65 65 70 61 6c 69 76 65>
4 Oct 03:06:31 - Adding data to buffer:
<Buffer 30 22 00 10 74 6f 6b 75 64 75 2f 6b 65 65 70 61 6c 69 76 65>
4 Oct 03:06:31 - Packet info: { command: 3, dup: false, qos: 0, retain: false }
4 Oct 03:06:31 - Length calculated: 36
4 Oct 03:06:31 - Incomplete packet, bytes needed to complete: 16
4 Oct 03:06:31 - Data received from client at 127.0.0.1
4 Oct 03:06:31 - <Buffer 39 37 37 34 64 35 36 64 36 38 32 65 35 34 39 63>
4 Oct 03:06:31 - Adding data to buffer:
<Buffer 30 22 00 10 74 6f 6b 75 64 75 2f 6b 65 65 70 61 6c 69 76 65 39 37 37 34 64 35 36 64 36 38 32 65 35 34 39 63>
4 Oct 03:06:31 - Packet complete
<Buffer 30 22 00 10 74 6f 6b 75 64 75 2f 6b 65 65 70 61 6c 69 76 65 39 37 37 34 64 35 36 64 36 38 32 65 35 34 39 63>

/mqtt.js:186
packet.body = chunk.slice((client.packet.lengthLength + 1), chunk.length)
^
TypeError: Cannot set property 'body' of undefined

There are some serious, serious memory issues here - fixes provided

Hi,

I have been working with a fork of your lib for quite a while now and I've managed to identify a number of them. First of all - the most obvious, undeclared variables. Here's a commit that fixes those:

https://github.com/radekg/MQTT.js/commit/5b7a1aa3fb581dde3cd1320bb2def89ebc1e1a3f
and one more here (in parse.js):
https://github.com/radekg/MQTT.js/commit/fddfaa33d832c0990223742475565a65fcbcb2e4

But the biggest memory issue is in connection.js, I was running 10000 connections for about 18 hours, each publishing 3.5KB of payload every 4 minutes and the memory was constantly growing, no matter what I would do to my code. I managed to nail it in finally.

The issue is that your main buffer in Connection object always grows. Always. Here's the fix:

https://github.com/radekg/MQTT.js/commit/f31ac1d9cdf66a27b7d748e79cfd23843465ca13
https://github.com/radekg/MQTT.js/commit/fec6b1bff81c353d88d9ffc3ace1186dc9279bc0

Basically, after:

this.buffer.read = pos;
this.buffer.written = len;

You, sir, need:

// Discard the old data in the buffer to free up the RAM
var tmpBuf = new Buffer( this.buffer.written - this.buffer.read );
this.buffer.copy( tmpBuf, 0, this.buffer.read, this.buffer.written );
this.buffer = null;
this.buffer = tmpBuf;
this.buffer.read = 0;
this.buffer.written = tmpBuf.length;
tmpBuf = null;

Also, when resizing the buffer at the top of parse there's not need to allocate more than new buffer.

In the last commit, in parse.js there's also a few memory management lines that will not hurt.
Hope this helps someone. Or just use the fork:

https://github.com/radekg/MQTT.js

And with npm:

npm install [email protected]:radekg/MQTT.js.git#master

Not listed on Mqtt.org website

Hi,

I just noticed, that the module is not listed on the Mqtt.org website. It would probably be great if it was represented there so people can find it when browsing libraries for their language.

server - get versionNum/username/password from client object

I see that client.id can be grabbed from a connection, but I can't seem to get the versionNum/username/password from the client object (so I can verify auth with a db). I know that the client works with mqtt.io broker using user/pass and versionNum: 3.1.

    clients = { }

    server = mqtt.createServer (client) =>
      console.log 'Broker:mqtt:createServer'

      # Catch when client connects
      client.on 'connect', (packet) =>
        console.log 'Broker:connect'

        client.connack
          returnCode: 0

        client.id = packet.client

        console.log 'version: ' + client.versionNum
        console.log 'client: ' + client.id # COMES THROUGH FINE!
        console.log 'username: ' + client.username # undefined
        console.log 'password: ' + client.password # udefined

        clients[client.id] = client

        console.log 'clients: ' + JSON.stringify clients

   ...

    server.listen 1883

Regression on 0.2.5

Unfortunately my refactoring has introduced some regression in the parsing of subscribe packets.
I'll look at it soon!

     TypeError: Cannot read property '0' of null
      at Object.module.exports.subscribe (/Users/matteo/Repositories/mosca/node_modules/mqtt/lib/parse.js:165:26)
      at MqttServerClient.Connection.parse (/Users/matteo/Repositories/mosca/node_modules/mqtt/lib/connection.js:87:25)
      at Readable.module.exports (/Users/matteo/Repositories/mosca/node_modules/mqtt/lib/connection.js:28:10)
      at Readable.EventEmitter.emit (events.js:93:17)
      at emitReadable_ (/Users/matteo/Repositories/mosca/node_modules/mqtt/node_modules/readable-stream/lib/_stream_readable.js:385:10)
      at emitReadable (/Users/matteo/Repositories/mosca/node_modules/mqtt/node_modules/readable-stream/lib/_stream_readable.js:380:5)
      at onEofChunk (/Users/matteo/Repositories/mosca/node_modules/mqtt/node_modules/readable-stream/lib/_stream_readable.js:360:5)
      at readableAddChunk (/Users/matteo/Repositories/mosca/node_modules/mqtt/node_modules/readable-stream/lib/_stream_readable.js:132:5)
      at Readable.push (/Users/matteo/Repositories/mosca/node_modules/mqtt/node_modules/readable-stream/lib/_stream_readable.js:115:10)
      at Socket.<anonymous> (/Users/matteo/Repositories/mosca/node_modules/mqtt/node_modules/readable-stream/lib/_stream_readable.js:746:10)
      at Socket.EventEmitter.emit (events.js:93:17)
      at TCP.onread (net.js:418:51)

Multiple clients to same server

I was having major issues with connecting multiple clients (in the same node js process) to the same server. Basically, the last client that I connected with would receive all messages incorrectly. After some debugging I noticed that the "defaultClient" name is defined as a global in "generate.js". I suggest this be moved into the "connect" function:

generate.js:

module.exports.connect = function(opts) {
  var defaultClient = 'mqtt_' + (new Date).valueOf() * Math.random() + (clientCounter++);
  ...
}

clientid must be client

The wiki states the clientid option can be used to specify a client id at connection time. However, the option is not clientid, but client, e.g.:
client: 'subscriber1'

cannot run client_test.js

Hi,
I tried couple ways, but does not work.

  1. npm install mqtt, then run examples/client/client_test.js. got error,
    has no method 'concat', lib/generate.js 139 line.
  2. download zip from github, MQTT.js-master
    run client_test.js,error shows below
    /home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:816
    ret = Buffer.concat(list, length);
    ^
    TypeError: Object function Buffer(subject, encoding, offset) {
    if (!(this instanceof Buffer)) {
    return new Buffer(subject, encoding, offset);
    }

var type;

// Are we slicing?
if (typeof offset === 'number') {
this.length = coerce(encoding);
this.parent = subject;
this.offset = offset;
} else {
// Find the length
switch (type = typeof subject) {
case 'number':
this.length = coerce(subject);
break;

  case 'string':
    this.length = Buffer.byteLength(subject, encoding);
    break;

  case 'object': // Assume object is an array
    this.length = coerce(subject.length);
    break;

  default:
    throw new Error('First argument needs to be a number, ' +
                    'array or string.');
}

if (this.length > Buffer.poolSize) {
  // Big buffer, just alloc one.
  this.parent = new SlowBuffer(this.length);
  this.offset = 0;

} else {
  // Small buffer.
  if (!pool || pool.length - pool.used < this.length) allocPool();
  this.parent = pool;
  this.offset = pool.used;
  pool.used += this.length;
}

// Treat array-ish objects as a byte array.
if (isArrayIsh(subject)) {
  for (var i = 0; i < this.length; i++) {
    this.parent[i + this.offset] = subject[i];
  }
} else if (type == 'string') {
  // We are a string
  this.length = this.write(subject, 0, encoding);
}

}

SlowBuffer.makeFastBuffer(this.parent, this, this.offset, this.length);
} has no method 'concat'
at fromList (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:816:20)
at Readable.read (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/stream_readable.js:308:11)
at Connection.parse (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/lib/connection.js:87:31)
at Readable. (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/lib/connection.js:31:12)
at Readable.emit (events.js:64:17)
at emitReadable
(/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:385:10)
at emitReadable (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:380:5)
at readableAddChunk (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:145:7)
at Readable.push (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:115:10)
at Socket. (/home/ubuntu/demo/demoNodeApp/node_modules/MQTT.js-master/node_modules/readable-stream/lib/_stream_readable.js:755:20)

Stream2

I've had a look at the new 'stream2' API, and it seems really better for our own use case.
It seems possible to avoid buffering at all while reading packets.

Support for node 0.10

Currently the package does not pass the tests on node 0.10.

$ ./node_modules/.bin/mocha test

  ․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․․Assertion failed: (uv__stream_fd(stream) >= 0), function uv_shutdown, file ../deps/uv/src/unix/stream.c, line 1084.
Abort trap: 6

They changed the stream API.
See this: http://blog.nodejs.org/2013/03/11/node-v0-10-0-stable/

MQTT messages seem to be sent mutlple times

I am following your example of the broadcast server and pub client but have come across an issue where the messages seem to be sent more than once

var mqtt = require('mqttjs');

var server = mqtt.createServer(function(client) {
    var clients = {};

    client.on('connect', function(packet) {
        console.log("[connect] %s", JSON.stringify(packet));
        client.connack({returnCode: 0});
        clients[client.id] = client;
    });

    client.on('publish', function(packet) {
        console.log("[publish] %s", JSON.stringify(packet));

        for (var clientId in clients) {
            clients[clientId].publish({topic: packet.topic, payload: packet.payload});
        }
    });

    client.on('subscribe', function(packet) {
        console.log("[subscribe] %s", JSON.stringify(packet));
    });

    client.on('pingreq', function(packet) {
        console.log("[pingreq] %s", JSON.stringify(packet));
    });

    client.on('disconnect', function(packet) {
        console.log("[disconnect] %s", JSON.stringify(packet));
    });

    client.on('close', function(packet) {
        console.log("[close] %s", JSON.stringify(packet));
    });

    client.on('error', function(packet) {
        console.log("[error] %s", JSON.stringify(packet));
    });
});
server.listen(9999);
console.log("Listening on port 9999");

Here is my pub client

/**
 * Created by JetBrains WebStorm.
 * User: jima
 * Date: 6/02/12
 * Time: 9:58 PM
 * To change this template use File | Settings | File Templates.
 */
var mqtt = require('mqttjs');
var argv = process.argv;

for (var index = 2; index <= 4; index++) {
    if (!argv[index]) {
        process.exit(-1);
    }

    var port = argv[2];
    var topic = argv[3];
    var payload = argv[4];

    var client = mqtt.createClient(port, "localhost", function(client) {
        client.connect({keepalive: 3});

        client.on('connack', function(packet) {
            if (packet.returnCode === 0) {
                client.publish({topic: topic, payload: payload});
                client.disconnect();
            } else {
                console.log("connack error %d", packet.returnCode);
                process.exit(-1);
            }
        });

        client.on('close', function() {
            process.exit(0);
        });

        client.on('error', function(error) {
            console.log("error %s", error);
            process.exit(-1);
        })
    });
}

and here is the output

Listening on port 9999
[connect] {"cmd":"connect","retain":false,"qos":0,"dup":false,"length":24,"version":"MQIsdp","versionNum":3,"clean":false,"keepalive":3,"client":"mqtt_53593"}
[connect] {"cmd":"connect","retain":false,"qos":0,"dup":false,"length":24,"version":"MQIsdp","versionNum":3,"clean":false,"keepalive":3,"client":"mqtt_53593"}
[connect] {"cmd":"connect","retain":false,"qos":0,"dup":false,"length":24,"version":"MQIsdp","versionNum":3,"clean":false,"keepalive":3,"client":"mqtt_53593"}
[publish] {"cmd":"publish","retain":false,"qos":0,"dup":false,"length":11,"topic":"jima","messageId":28001,"payload":"hello"}
[disconnect] {"cmd":"disconnect","retain":false,"qos":0,"dup":false,"length":0}
[publish] {"cmd":"publish","retain":false,"qos":0,"dup":false,"length":11,"topic":"jima","messageId":28001,"payload":"hello"}
[disconnect] {"cmd":"disconnect","retain":false,"qos":0,"dup":false,"length":0}
[publish] {"cmd":"publish","retain":false,"qos":0,"dup":false,"length":11,"topic":"jima","messageId":28001,"payload":"hello"}
[disconnect] {"cmd":"disconnect","retain":false,"qos":0,"dup":false,"length":0}

Is this part of the QoS?

Docs wrong for new API subscribe

Migration page says:

The option to specify a single subscription topic has been removed from Connection#subscribe, subscriptions must be supplied as an array of {topic: x, qos: 0} objects.

Example page says (and what works):

client.subscribe('example');

That's surely not an array of objects :)

parse.js is not parsing publish messages properly

In this function https://github.com/adamvr/MQTT.js/blob/master/lib/parse.js#L105 there are two problems:

The parser is not adding +2 to the topic length when moving position to take into account the 2 bytes for the string length.

The parser is always parsing message ID even when QoS is 0. For messages with QoS 0 this means that the message ID being currently parsed is the last 2 bytes of the topic name and this is wrong.

Per the MQTT spec the mesage ID will only be present if QoS is 1 or 2:
http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#publish

"Message ID
Present for messages with QoS level 1 and QoS level 2. See Message identifiers for more details."

NPM packaging

Could you package this library for NPM?

If you want, I could do it.

Behaviour of createClient when dealing with errors

It should be possible that the createClient goes wrong, but the associated callback is not called.
The API should be consistent with the nodejs best practices, passing an "error" as the first parameter, anche the associated client as the second one.

Open question: Is it worth simplifying MqttServerClients?

Here's the question: is it worth simplifying MqttServerClient. By which, I mean do we want to add features to them like:

  • Automatic pinging
  • QoS handling
  • Resending
  • Simplified API (a la MqttClient)

My thinking is that if you're writing a server, you probably want the fine grained control that you get from bare MqttConnection.

What do you guys reckon?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.