sskaje / mqtt Goto Github PK
View Code? Open in Web Editor NEWMQTT Client class
Home Page: https://sskaje.me/category/MQTT/
License: MIT License
MQTT Client class
Home Page: https://sskaje.me/category/MQTT/
License: MIT License
Greetings,
Many thanks for your work, it has proven to be really helpful.
Branch 2.0 has been out for some time now, and until now we did not run into major issues for our use cases.
Is it be possible to release a stable 2.0 version? Maybe merging with the tip of master?
From a composer & packgist point of view, it would simplify our integration process.
Regards,
Reported by @w3yyb
Oh my, i was too quick to jump the gun :/ qos2 works perfect !
On some environments (encoutered on centos 6 / 7 + php 5.3x /5.4x) , when publishing big content, with any Qos, (for me, it choked on a little bit more over 42kb, Qos2) , you can get a PHP Notice: fwrite(): send of x bytes failed with errno=11 Resource temporarily unavailable in /tmp/mqtt/mqtt/SocketClient.php on line 166 ..
Indeed, it seems that write($packet, $packet_size) in SocketClient.php sometimes returns Null or 0 instead of false or length written.
I know mqtt is not meant for big content, but some printers use the mqtt protocol, and sometimes you need to pass really big files for pdfs or many images. (and i also already had invested too much time into this not solving it)
I modified the write($packet, $packet_size) function in SocketClient.php on line 175 like so :
/**
* Send data
*
* @param string $packet
* @param int $packet_size
* @return int
*/
public function write($packet, $packet_size)
{
if (!$this->socket || !is_resource($this->socket)) return false;
Debug::Log(Debug::DEBUG, "socket_write(length={$packet_size})", $packet);
#add
do { $packet = substr($packet, fwrite($this->socket, $packet)); } while (!empty($packet));
If (!empty($packet)) {
return false;
} else {
return $packet_size;
}
#endadd
#disabled
#return fwrite($this->socket, $packet, $packet_size);
}
It is ugly, but at least now it works flawless (at least for me) . It forces fwrite to do writing correctly, and if not return expected value.
Thanks a thousands to Sskaje who seriously took a lot of personal time debugging my stuff actively and hanging with me for quite a long time and thanks again for his usefull mqtt php library.
Sometimes MQTT-client breaks down with exception "PHP Fatal error: Uncaught exception 'SPMQTT_Exception' with message 'SUBSCRIBE/SUBACK message identifier mismatch: ........ in z_MQTT_class.php:356"
Debug mode log:
[2014-04-09 19:54:12.882859] socket_connect()
[2014-04-09 19:54:12.886875] socket_connect(): connect to=tcp://127.0.0.1:1883/
[2014-04-09 19:54:12.887093] connect()
[2014-04-09 19:54:12.887205] connect(): clientid=client_id_
[2014-04-09 19:54:12.887257] connect(): keepalive=90
[2014-04-09 19:54:12.887304] connect(): username=gamble0 password=123123
[2014-04-09 19:54:12.887405] Message Build: remaining length=41
[2014-04-09 19:54:12.887463] Message write: message_type=1
[2014-04-09 19:54:12.887526] Message Build: remaining length=41
[2014-04-09 19:54:12.887658] Message write: message=10 29 00 06 4d 51 49 73 64 70 03 c0 00 5a 00 0a 63 6c 69 65 6e 74 5f 69 64 5f 00 07 67 61 6d 62 6c 65 30 00 06 31 32 33 31 32 33
[2014-04-09 19:54:12.887705] Message write: bytes written=43
[2014-04-09 19:54:12.887749] connect(): bytes written=43
[2014-04-09 19:54:12.887795] Message Read: message_type=2
[2014-04-09 19:54:12.887850] Message Read: bytes to read=4
[2014-04-09 19:54:12.887931] Message read: message=20 02 00 00
[2014-04-09 19:54:12.887976] Message Read: bytes to read=4
[2014-04-09 19:54:12.888029] Connected to Broker
[2014-04-09 19:54:12.888264] connect(): connected=1
[2014-04-09 19:54:12.888371] ping()
[2014-04-09 19:54:12.888483] Message write: message_type=12
[2014-04-09 19:54:12.888590] Message Build: remaining length=0
[2014-04-09 19:54:12.889038] Message write: message=c0 00
[2014-04-09 19:54:12.889178] Message write: bytes written=2
[2014-04-09 19:54:12.889270] Message Read: message_type=13
[2014-04-09 19:54:12.889372] Message Read: bytes to read=2
[2014-04-09 19:54:12.889505] Message read: message=d0 00
[2014-04-09 19:54:12.889595] Message Read: bytes to read=2
[2014-04-09 19:54:12.889801] Message PINGRESP: success
[2014-04-09 19:54:12.889895] ping(): response 1
array(2) {
["#"]=>
int(0)
["/$SYS/#"]=>
int(0)
}
[2014-04-09 19:54:12.890398] Message write: message_type=8
[2014-04-09 19:54:12.890499] Message SUBSCRIBE: msgid=35486
[2014-04-09 19:54:12.890604] Message Build: remaining length=16
[2014-04-09 19:54:12.891123] Message write: message=82 10 8a 9e 00 01 23 00 00 07 2f 24 53 59 53 2f 23 00
[2014-04-09 19:54:12.891299] Message write: bytes written=18
[2014-04-09 19:54:12.891470] subscribe(): bytes written=18
[2014-04-09 19:54:12.891684] Message Read: message_type=9
[2014-04-09 19:54:12.891808] Message Read: bytes to read=4
[2014-04-09 19:54:12.891931] Message read: message=90 04 8a 9e
[2014-04-09 19:54:12.892020] Message Read: bytes to read=4
[2014-04-09 19:54:12.892153] Message SUBACK: success
[2014-04-09 19:54:12.892271] loop()
[2014-04-09 19:54:12.928167] checkAndPing()
[2014-04-09 19:54:12.928335] loop(): message_type=3, dup=0, QoS=0, RETAIN=1
[2014-04-09 19:54:12.928404] loop(): remaining length=44 to read=42
[2014-04-09 19:54:12.928506] loop(): read message=31 2c 00 29 67 6f 61 6d 62 6c 65 2f 73 65 6e 73 6f 72 73 2f 31 33 39 37 30 34 37 33 34 36 2f 30 31 39 33 39 38 2f 76 6f 6c 74 61 67 65 36
[2014-04-09 19:54:12.928561] loop(): PUBLISH
[2014-04-09 19:54:12.928622] loop(): PUBLISH QoS=0 PASS
goamble/sensors/1397047346/019398/voltage 6
[2014-04-09 19:54:12.928726] checkAndPing()
[2014-04-09 19:54:12.928792] loop(): message_type=3, dup=0, QoS=0, RETAIN=1
[2014-04-09 19:54:12.928848] loop(): remaining length=40 to read=38
[2014-04-09 19:54:12.928942] loop(): read message=31 28 00 25 67 6f 61 6d 62 6c 65 2f 73 65 6e 73 6f 72 73 2f 31 33 39 37 30 34 37 33 34 36 2f 30 31 39 33 39 38 2f 67 73 6d 34
[2014-04-09 19:54:12.928994] loop(): PUBLISH
[2014-04-09 19:54:12.929048] loop(): PUBLISH QoS=0 PASS
goamble/sensors/1397047346/019398/gsm 4
[2014-04-09 19:54:12.929124] checkAndPing()
[2014-04-09 19:54:12.929188] loop(): message_type=3, dup=0, QoS=0, RETAIN=1
[2014-04-09 19:54:12.929243] loop(): remaining length=41 to read=39
[2014-04-09 19:54:12.929338] loop(): read message=31 29 00 26 67 6f 61 6d 62 6c 65 2f 73 65 6e 73 6f 72 73 2f 31 33 39 37 30 34 37 33 34 36 2f 30 31 39 33 39 38 2f 73 61 74 73 30
[2014-04-09 19:54:12.929391] loop(): PUBLISH
[2014-04-09 19:54:12.929444] loop(): PUBLISH QoS=0 PASS
goamble/sensors/1397047346/019398/sats 0
[2014-04-09 19:54:12.929518] checkAndPing()
[2014-04-09 19:54:12.929581] loop(): message_type=3, dup=0, QoS=0, RETAIN=1
[2014-04-09 19:54:12.929636] loop(): remaining length=54 to read=52
[2014-04-09 19:54:12.929744] loop(): read message=31 36 00 2a 67 6f 61 6d 62 6c 65 2f 73 65 6e 73 6f 72 73 2f 31 33 39 37 30 34 37 33 34 36 2f 30 31 39 33 39 38 2f 6c 61 73 74 5f 75 70 64 31 33 39 37 30 34 37 33 34 36
[2014-04-09 19:54:12.929797] loop(): PUBLISH
[2014-04-09 19:54:12.929851] loop(): PUBLISH QoS=0 PASS
goamble/sensors/1397047346/019398/last_upd 1397047346
[2014-04-09 19:55:02.733819] checkAndPing()
[2014-04-09 19:55:02.733972] checkAndPing(): current_time=1397058902, time=1397058852, keepalive=90
[2014-04-09 19:55:02.734019] ping()
[2014-04-09 19:55:02.734083] Message write: message_type=12
[2014-04-09 19:55:02.734138] Message Build: remaining length=0
[2014-04-09 19:55:02.734223] Message write: message=c0 00
[2014-04-09 19:55:02.734268] Message write: bytes written=2
[2014-04-09 19:55:02.734335] Message Read: message_type=13
[2014-04-09 19:55:02.734389] Message Read: bytes to read=2
[2014-04-09 19:55:02.734457] Message read: message=30 3e
[2014-04-09 19:55:02.734502] Message Read: bytes to read=2
[2014-04-09 19:55:02.734560] Message PINGRESP: type mismatch
[2014-04-09 19:55:02.734605] ping(): response 0
[2014-04-09 19:55:02.734650] loop(): EOF detected
[2014-04-09 19:55:02.734692] reconnect()
[2014-04-09 19:55:02.734733] reconnect(): close current
[2014-04-09 19:55:02.734774] disconnect()
[2014-04-09 19:55:02.734825] Message write: message_type=14
[2014-04-09 19:55:02.734874] Message Build: remaining length=0
[2014-04-09 19:55:02.734947] Message write: message=e0 00
[2014-04-09 19:55:02.734991] Message write: bytes written=2
[2014-04-09 19:55:02.735035] socket_close()
[2014-04-09 19:55:02.735125] socket_connect()
[2014-04-09 19:55:02.735171] socket_connect(): connect to=tcp://127.0.0.1:1883/
[2014-04-09 19:55:02.735318] connect()
[2014-04-09 19:55:02.735379] connect(): clientid=client_id_
[2014-04-09 19:55:02.735425] connect(): keepalive=90
[2014-04-09 19:55:02.735470] connect(): username=gamble0 password=123123
[2014-04-09 19:55:02.735534] Message Build: remaining length=41
[2014-04-09 19:55:02.735582] Message write: message_type=1
[2014-04-09 19:55:02.735639] Message Build: remaining length=41
[2014-04-09 19:55:02.735747] Message write: message=10 29 00 06 4d 51 49 73 64 70 03 c0 00 5a 00 0a 63 6c 69 65 6e 74 5f 69 64 5f 00 07 67 61 6d 62 6c 65 30 00 06 31 32 33 31 32 33
[2014-04-09 19:55:02.735794] Message write: bytes written=43
[2014-04-09 19:55:02.735836] connect(): bytes written=43
[2014-04-09 19:55:02.735879] Message Read: message_type=2
[2014-04-09 19:55:02.735929] Message Read: bytes to read=4
[2014-04-09 19:55:02.736003] Message read: message=20 02 00 00
[2014-04-09 19:55:02.736047] Message Read: bytes to read=4
[2014-04-09 19:55:02.736109] Connected to Broker
[2014-04-09 19:55:02.736152] connect(): connected=1
[2014-04-09 19:55:02.736225] Message write: message_type=8
[2014-04-09 19:55:02.736272] Message SUBSCRIBE: msgid=35673
[2014-04-09 19:55:02.736327] Message Build: remaining length=16
[2014-04-09 19:55:02.736407] Message write: message=82 10 8b 59 00 01 23 00 00 07 2f 24 53 59 53 2f 23 00
[2014-04-09 19:55:02.736451] Message write: bytes written=18
[2014-04-09 19:55:02.736495] subscribe(): bytes written=18
[2014-04-09 19:55:02.736537] Message Read: message_type=9
[2014-04-09 19:55:02.736585] Message Read: bytes to read=4
[2014-04-09 19:55:02.736658] Message read: message=30 3e 00 38
[2014-04-09 19:55:02.736702] Message Read: bytes to read=4
[2014-04-09 19:55:02.736759] Message SUBACK: type mismatch
PHP Fatal error: Uncaught exception 'SPMQTT_Exception' with message 'SUBSCRIBE/SUBACK message identifier mismatch: 35673:' in /wwwroot/borey/htdocs/mqtt/z_MQTT_class.php:356
Stack trace:
#0 /wwwroot/borey/htdocs/mqtt/z_MQTT_class.php(393): spMQTT->subscribe(Array)
#1 -(33): spMQTT->loop('default_subscri...')
#2 {main}
thrown in /wwwroot/borey/htdocs/mqtt/z_MQTT_class.php on line 356
I have mosquitto broker under Debian 7.2 (Linux debian 3.2.0-4-686-pae #1 SMP Debian 3.2.51-1 i686 GNU/Linux):
apt-cache show mosquitto
Package: mosquitto
Version: 1.2.3-0mosquitto2
Architecture: i386
Maintainer: Roger A. Light [email protected]
Installed-Size: 326
Depends: libc6 (>= 2.4), libssl1.0.0 (>= 1.0.1), libwrap0 (>= 7.6-4~), adduser (>= 3.10), lsb-base (>= 4.1+Debian3)
Filename: pool/main/m/mosquitto/mosquitto_1.2.3-0mosquitto2_i386.deb
Size: 118206
Package: mosquitto
Version: 0.15-2
Installed-Size: 188
Maintainer: Roger A. Light [email protected]
Architecture: i386
Depends: libc6 (>= 2.1), libwrap0 (>= 7.6-4~), adduser (>= 3.10), lsb-base (>= 3.2-13)
Filename: pool/main/m/mosquitto/mosquitto_0.15-2_i386.deb
Size: 68278
Hi
I configured Mosquitto to accept TLS v1.2 only and reject TLS v1.0 or v1.1
The library works fine this way, but PHP os poorly documented about enforcing TLS v1.2 with Socket context. See this page : http://php.net/manual/en/function.stream-socket-enable-crypto.php
There is no mention of all constants available here http://php.net/manual/en/migration56.constants.php , especially STREAM_CRYPTO_METHOD_TLSv1_2_SERVER
Not configuring this crypto method in the socket context will prevent PHP from connecting to a Mosquitto instance configured to accelt only TLS v1.2.
I think a note in the README.md or a wiki page would be helpful to help users of your library ensure an aceptable level of security, as TLS < v1.2 is weak now.
Hi
I discovered when the lib receives a PUBLISH MQTT message with a remaining_length_bytes = 48 an exception is thrown
Exception while listening MQTT messages :
#0 .../vendor/sskaje/mqtt/mqtt/Message/Base.php(116): sskaje\mqtt\Message\Header\Base->decode('00\x00\x19/5/agent/c3...', 48, 0)
#1 .../vendor/sskaje/mqtt/mqtt/MQTT.php(1318): sskaje\mqtt\Message\Base->decode('00\x00\x19/5/agent/c3...', 48)
#2 .../vendor/sskaje/mqtt/mqtt/MQTT.php(793): sskaje\mqtt\MQTT->message_read()
#3 .../vendor/sskaje/mqtt/mqtt/MQTT.php(779): sskaje\mqtt\MQTT->handle_incoming()
#4 .../vendor/sskaje/mqtt/mqtt/MQTT.php(1135): sskaje\mqtt\MQTT->handle_message()
#5 .../inc/mqttclient.class.php(94): sskaje\mqtt\MQTT->loop()
#6 .../scripts/mqtt.php(37): PluginStorkmdmMqttclient->subscribe()
#7 {main}
Notice the remaining length equals the first byte of the message (PUBLISH). Because the first byte and the second bytes are the same, the strpos reports the position of the first byte instead of the second, and it does not matches $pos.
Hi
The library keeps sending pings despite the connections has been closed
Here is the relevant backtrace, repeated again and again many times per second
*** PHP Notice(8): fwrite(): send of 2 bytes failed with errno=32 Broken pipe
Backtrace :
:
.../lib/mqtt/mqtt/SocketClient.php:166 fwrite()
.../lib/mqtt/mqtt/MQTT.php:1230 sskaje\mqtt\SocketClient->write()
.../lib/mqtt/mqtt/MQTT.php:1214 sskaje\mqtt\MQTT->message_write()
.../lib/mqtt/mqtt/MQTT.php:1191 sskaje\mqtt\MQTT->simpleCommand()
.../lib/mqtt/mqtt/MQTT.php:1175 sskaje\mqtt\MQTT->ping()
.../lib/mqtt/mqtt/MQTT.php:1132 sskaje\mqtt\MQTT->keepalive()
.../inc/mqttclient.class.php:97 sskaje\mqtt\MQTT->loop()
In sskaje\mqtt\MQTT->ping()
there is no test on the actually written bytes, making the library unable to detect the broken pipe.
I guess the library will enter in an endless loop because no MesageHandler callback can run to unsubscribe all topics or close the connection.
Maybe sskaje\mqtt\MQTT->ping()
should throw an exception here.
Hi,
I tried your beautiful library and work fine, but all messages have the Timestamp not set and the ActiveMQ report Header Timestamp 1970-01-01T01:00:00+01:00.
Is there a way to set the Timestamp from library or it's a bug ?
Thanks
Roberto
Hi
I'm beginning to replace a MQTT library lacking QoS. It seems promising; thank you very much.
I noticed the methods do_subscribe() and do_unsubscribe() are protected, whereas do_publish() is public.
I wanted to point it out as I believe is is maybe intended to be protected too.
There is a lot of methods in subscribe.php:
$publish_object->getMsgID(),
$publish_object->getQoS(),
$publish_object->getDup(),
$publish_object->getTopic(),
$publish_object->getMessage()
I need to know client id that publish message, is it possible?
is it surport mqtt v 3.3.1?
Sskaje, thank you for quick actions!
Problem with uncaught exception was fixed, but it seems QoS 1/2 doesn't work.
For example: I start the subscribe script, after subscribe (with QoS=1 or 2) action I kill the script. After that I run publish script - it send some messages with QoS=1 or 2 and I kill it.
After that I run the same subscribe script (with QoS=1 or 2 and the same client_id) and there are no messages...
Where is my mistake?
Hi
I use multiple clients to subscribe to the same channel. Sometimes, some clients catch this Exception, some works just fine. Could you help me to explain why this happens and how can I deal with this?
[2019-02-14 02:49:46.236260] INFO loop(): received PUBLISH
[2019-02-14 02:49:46.236296] DEBUG loop(): PUBLISH QoS=0 PASS
I got a message:(msgid=0, QoS=0, dup=0, topic=status, command=GRADE_HOSE_TANK) {"command":"GRADE_HOSE_TANK","status":"OK","stationId":212,"dispenserNum":5,"hoseNum":0,"gradeNum":12,"tankNum":5}
[2019-02-14 02:49:46.280716] DEBUG socket_connect()
[2019-02-14 02:49:46.280782] DEBUG socket_connect(): connect to={my url}
[2019-02-14 02:49:46.471183] INFO connect(): Connection established.
[2019-02-14 02:49:46.471316] DEBUG SOCKET: blocking mode: ON
[2019-02-14 02:49:46.471409] DEBUG connect(): clientid={clientId}
[2019-02-14 02:49:46.471459] DEBUG connect(): keepalive=60
[2019-02-14 02:49:46.471496] DEBUG connect(): username={my username} password={my password}
[2019-02-14 02:49:46.471544] DEBUG Message write: message_type=CONNECT
[2019-02-14 02:49:46.471731] DEBUG Message Build: total length=59
[2019-02-14 02:49:46.472173] DEBUG socket_write(length=59)
DUMP 10 39 00 06 4d 51 49 73 64 70 03 c2 00 3c 00 14 .9..MQIsdp...<..
DUMP 6d 71 74 74 39 33 33 39 36 64 64 37 38 37 35 39 mqtt93396dd78759
DUMP 63 31 31 37 00 05 73 61 76 65 6f 00 0e 65 64 67 c117..saveo..edg
DUMP 65 2d 77 6f 72 6b 73 2e 6e 65 74 e-works.net
[2019-02-14 02:49:46.472319] DEBUG connect(): bytes written=59
[2019-02-14 02:49:46.472367] DEBUG socket_read(2)
Fatal error: Uncaught exception 'sskaje\mqtt\Exception' with message 'WTFFFFFF!!!! ' in /var/www/html/api/library/mqtt/mqtt/MQTT.php on line 1275
sskaje\mqtt\Exception: WTFFFFFF!!!! in /var/www/html/api/library/mqtt/mqtt/MQTT.php on line 1275
Call Stack:
0.0003 243288 1. {main}() /var/www/html/api/app/cli.php:0
0.0007 257488 2. Phalcon\Cli\Console->handle() /var/www/html/api/app/cli.php:69
0.0008 261960 3. Phalcon\Dispatcher->dispatch() /var/www/html/api/app/cli.php:69
I am able to publish using publish.php. But when I am trying to run subscribe.php, the page was loading..... but I can't see any result. Can you please tell me the mistake I am doing.
yii框架中测试时:
E:\yiqing-workspace\my-github\aheadmall\console>yii
PHP Strict Warning 'yii\base\ErrorException' with message 'Declaration of console\controllers\MySubscribeCallback::publish() should be compatible with sskaje\mqtt\MessageHandler::publish(sskaje\mqtt\MQTT $mqtt, ss
kaje\mqtt\Message\PUBLISH $publish_object)'
php5.6 报的错误
我嫌麻烦 直接跑到你基类里面 改掉了 参数的类型暗示: Message\PUBLISH $publish_object =》 $publish_object
你这里的类型暗示 名空间偷懒 不带全部前缀 ^_^ 我不清楚是不是这个原因
MySubscribeCallback 这个类是从你测试文件中 直接拷贝到我的名空间文件去的 ;
Hi
If CONNACK contains 0x00 0x05 the spec says it means Connection Refused, not authorized
If this happens, the code returns ERROR Connection failed! (Error: 0x00 0x05|Unknown error)
There is puback() method in mqtt/mqtt/MessageHandler.php?
How to use it to confirm message receive?
So, when running composer require sskaje/mqtt dev-master
it install version in commit 38cf01a from the 16 Mar 2018. The latest pull requests is not included.
In the high load env, the v2.0 often report the following error:
fwrite(): send of 2 bytes failed with errno=32 Broken pipe file:
but the old v1.0 can work well Basically.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.