zentures / surgemq Goto Github PK
View Code? Open in Web Editor NEW(Unmaintained) High-Performance MQTT Server and Client Libraries
License: Apache License 2.0
(Unmaintained) High-Performance MQTT Server and Client Libraries
License: Apache License 2.0
Hello!
One more feature request: can you, please, add a support for websockets (standard and secure) in broker, so we can implement "MQTT over Websockets"-thing natively with SurgeMQ? Combined with Paho's JS library we can build very modern web applications with the benefits of MQTT.
Thanks!
I built the "surgemq" example on a Raspberry Pi 2 with Debian Jessie and get this
E0221 22:25:38.464626 3586 service.go:193/func·008] (1/mosqpub/3594-jessie-rpi) Recovering from panic: runtime error: invalid memory address or nil pointer dereference
E0221 22:25:38.468213 3586 sendrecv.go:33/func·003] (1/mosqpub/3594-jessie-rpi) Recovering from panic: runtime error: invalid memory address or nil pointer dereference
E0221 22:25:38.468502 3586 sendrecv.go:74/func·004] (1/mosqpub/3594-jessie-rpi) Recovering from panic: runtime error: invalid memory address or nil pointer dereference
^CE0221 22:25:45.004451 3586 surgemq.go:77/func·001] Existing due to trapped signal; interrupt
when I execute mosquitto_pub -r -m "test by mr" -t 'test/topic'
.
I had to change the example although as otherwise I got more errors. My imports look now like this:
"github.com/surgemq/surgemq/service"
"github.com/surge/glog"
I am trying to authenticate a user when he subscribes to a channel
I'm running an MQTT client that is able to connect to the public MQTT brokers at broker.mqtt-dashboard.com and iot.eclipse.org, but when I try to connect the client to a stand-alone surgemq broker running on my own machine it fails.
The broker runs without error messages, so I guess it is listening for connections. My client however, cannot connect.
Does anyone have an idea what might be wrong with my setup?
i want to use this as im mqtt broker ,but how to persistent the session topic and message to redis?
anyone can give me some suggestions.thx!
Hi,
I was wondering why the length check here limits the client id to max 23 bytes.
The 3.1.1 spec says you must allow client id's with a length between 1 and 23, but it doesn't say you can't have client id's longer than 23 bytes right? Or am I misunderstanding?
As the subject says ... I saw in the godoc dependency graph that SurgeMQ pulls in gorilla/websocket (see service/sendrecv.go) - and was curious as to why this is needed?
Great project btw - impressively well done, IMO.
-jcw
When ack cycle completes, call OnComplete function.How to deal with ack timeout?
I investigation and research the bridge of emqttd, there is not enough details for me, and I see you will support the bridge in future, so I want to know details about the bridge, is it mqtt client and proxy real client?
How to config a SurgeMQ cluster? Thx.
I haven't noticed it first as I used an idiomatic channel approach for a while but when I was stuck at 100k msg/sec, I took a look at your ring buffer once again.
In general I really like your buffer (and its amazingly fast 👍 ) but i've found some serious race conditions with your cursor locks.
They were hard to find because they're occurring really really rarely on my machine (1 out of 10k messages or so in a strictly serial 1-write-1-read test) but then your broker explodes.
Just take a look at ReadWait and WriteCommit.
func (this *buffer) WriteCommit(n int) (int, error) {
start, cnt, err := this.waitForWriteSpace(n)
if err != nil {
return 0, err
}
// If we are here then there's enough bytes to commit
this.pseq.set(start + int64(cnt))
this.ccond.L.Lock()
this.ccond.Broadcast()
this.ccond.L.Unlock()
return cnt, nil
}
func (this *buffer) ReadWait(n int) ([]byte, error) {
if int64(n) > this.size {
return nil, bufio.ErrBufferFull
}
if n < 0 {
return nil, bufio.ErrNegativeCount
}
cpos := this.cseq.get()
ppos := this.pseq.get()
// This is the magic read-to position. The producer position must be equal or
// greater than the next position we read to.
next := cpos + int64(n)
// If there's no data, then let's wait until there is some data
this.ccond.L.Lock()
for ; next > ppos; ppos = this.pseq.get() {
if this.isDone() {
return nil, io.EOF
}
this.ccond.Wait()
}
this.ccond.L.Unlock()
// If we are here that means we have at least n bytes of data available.
cindex := cpos & this.mask
// If cindex (index relative to buffer) + n is more than buffer size, that means
// the data wrapped
if cindex+int64(n) > this.size {
// reset the tmp buffer
this.tmp = this.tmp[0:0]
l := len(this.buf[cindex:])
this.tmp = append(this.tmp, this.buf[cindex:]...)
this.tmp = append(this.tmp, this.buf[0:n-l]...)
return this.tmp[:n], nil
}
return this.buf[cindex : cindex+int64(n)], nil
}
The race scenario:
ppos := this.pseq.get()
ppos
, locks ccond
, notifies sleeping consumers and finishesSimple fix:
As you'll acquire the consumer lock anyway, you should read and write the producer position with consumer lock.
I haven't checked the other methods yet, but chances are good that there are more races like this one as you always use atomic operations without locking the parties.
No updates for quite some time, and the home page (http://surgemq.com) is not found.
Is this project dead?
ive got some problem here.
sometimes ReadPeek could not be waked up after buffer.WriteCommit call
when i run 25000 client.
could the ccond.wait be not notified sometime?
Q: Is it possible to publish topics to the server before it starts serving network requests?
This might make it possible to implement persistent retains as a separate process: i.e. log all retained messages to file or some database via a listener subscribed to "#", then on startup have the server go through the saved data and "pre-publish" them as a way to prime the in-memory cache of SurgeMQ.
-jcw
When I send a big message I got an error:
utils/writeLPBytes: Length (452624) greater than 65535 bytes
Hi
Prompt, please, how to turn on authorization on the server and where data of authorized users (user name and the password (hash)) will be stored?
I think consul.io would possibly make a good storage back-end to add to the current in mem back-end.
Thoughts?
Hello,
a page at http://godoc.org/github.com/surge/surgemq gives a "Not Found" error.
$ cd $GOPATH/src/github.com/surgemq/surgemq/examples/surgemq
$ go run *.go
then, go run this code with -logtostderr -vv 3:
package main
import (
"flag"
"log"
"github.com/surgemq/message"
"github.com/surgemq/surgemq/service"
)
func main() {
flag.Parse()
c := &service.Client{}
cmsg := message.NewConnectMessage()
cmsg.SetVersion(4)
cmsg.SetCleanSession(true)
cmsg.SetKeepAlive(10)
if err := c.Connect("tcp://localhost:1883", cmsg); err != nil {
log.Fatalln("Connect:", err)
}
fn := func(msg, ack message.Message, err error) error {
log.Println("Pong:", err)
return err
}
done := make(chan struct{})
if err := c.Ping(service.OnCompleteFunc(fn)); err != nil {
log.Fatalln("Ping:", err)
close(done)
}
<-done
}
I1204 23:16:44.348778 13936 sendrecv.go:103/sender] (1/) Starting sender
I1204 23:16:44.348781 13936 process.go:46/processor] (1/) Starting processor
I1204 23:16:44.348883 13936 sendrecv.go:58/receiver] (1/) Starting receiver
I1204 23:16:44.349183 13936 service.go:210/stop] (1/) closing this.conn
I1204 23:16:44.349223 13936 sendrecv.go:100/func1] (1/) Stopping sender
E1204 23:16:44.349252 13936 sendrecv.go:76/receiver] (1/) error reading from connection: read tcp [::1]:61472->[::1]:1883: use of closed network connection
I1204 23:16:44.349364 13936 sendrecv.go:55/func1] (1/) Stopping receiver
I1204 23:16:44.349375 13936 service.go:220/stop] (1/) Received 16 bytes in 2 messages.
I1204 23:16:44.349383 13936 service.go:221/stop] (1/) Sent 6 bytes in 2 messages.
In case of Client.Ping()
, ackmsg.Msgbuf
is empty in service.processAcked()
.
As a result, msg.Decode(ackmsg.Msgbuf)
panics with "slice bounds out of range".
Hello!
Feature request for a future: do you consider adding SSL / TLS support, so we can have MQTT in SurgeMQ to work over SSL / TLS?
Thanks!
Running the example code as per the docs throws an error:
Server:
Client:
Env: go version go1.7.3 darwin/amd64
I use surgemq as a MQTT broker. When I send messages with two publisher at the same time and a subscriber with qos 2. The subscriber loss some message.I try to find out why? It seems to has bug in ackqueue.go. Beacuse when I use qos 0 or 1 ,not message loss. Please help!
Hello,
A while ago my colleagues and I had some fun testing MQTT servers we wrote: http://goo.gl/RorFlv
The tools we use for stressing our implementations are pingtest and loadtest:
go get code.google.com/p/jra-go/mqtt/pingtest
go get code.google.com/p/jra-go/mqtt/loadtest
pingtest currently fails with surgemq, because something bad happens during disconnect. You can even see it with "pingtest -pairs=1"
I have not investigated yet if this is from my client, but because we have used it with many many server implementations, it is more likely a problem with surgemq.
I'm writting a MQTT client using surgemq package.
service.Client panics with "topics: Register called twice for provider " message.
$ cd $GOPATH/src/github.com/surgemq/surgemq/examples/surgemq
$ go run *.go
then, go run:
package main
import (
"log"
"github.com/surgemq/message"
"github.com/surgemq/surgemq/service"
)
func Connect() (*service.Client, error) {
c := &service.Client{}
msg := message.NewConnectMessage()
msg.SetVersion(4)
msg.SetCleanSession(true)
msg.SetKeepAlive(10)
if err := c.Connect("tcp://localhost:1883", msg); err != nil {
return nil, err
}
return c, nil
}
func main() {
client, err := Connect()
if err != nil {
log.Fatal(err)
}
client.Disconnect()
client, err = Connect()
client.Disconnect()
}
Has any plan to support ACL plug-in feature?
Use acl control which client can subscribe to what kind of topic.
The QosFailure will never return to client
The Subscribe function in memtopics.go message.QosFailure always whith some err.
https://github.com/influxdata/surgemq/blob/master/topics/memtopics.go#L61
func (this *memTopics) Subscribe(topic []byte, qos byte, sub interface{}) (byte, error) {
if !message.ValidQos(qos) {
return message.QosFailure, fmt.Errorf("Invalid QoS %d", qos)
}
if sub == nil {
return message.QosFailure, fmt.Errorf("Subscriber cannot be nil")
}
this.smu.Lock()
defer this.smu.Unlock()
if qos > MaxQosAllowed {
qos = MaxQosAllowed
}
if err := this.sroot.sinsert(topic, qos, sub); err != nil {
return message.QosFailure, err
}
return qos, nil
}
But the processSubscribe function in process.go ,when Subscribe return err QosFailure will not append into retcodes but function return
https://github.com/influxdata/surgemq/blob/master/service/process.go#L315
for i, t := range topics {
rqos, err := this.topicsMgr.Subscribe(t, qos[i], &this.onpub)
if err != nil {
return err
}
this.sess.AddTopic(string(t), qos[i])
retcodes = append(retcodes, rqos)
// yeah I am not checking errors here. If there's an error we don't want the
// subscription to stop, just let it go.
this.topicsMgr.Retained(t, &this.rmsgs)
glog.Debugf("(%s) topic = %s, retained count = %d", this.cid(), string(t), len(this.rmsgs))
}
I use this library to start broker, and I run this command: mosquitto_sub -v -t \$SYS/#
the error show it:
Error processing SUBSCRIBE: memtopics/nextTopicLevel: Cannot publish to $ topics
Is a bug?
hi, I test surgemq with python paho client.
I got this error log :
E0424 18:53:12.538812 16826 process.go:55/processor] (2/testing) Error peeking next message size: EOF
E0424 18:53:44.130772 16826 sendrecv.go:55/receiver] (1/testingsub) error reading from connection: read tcp 127.0.0.1:52919: i/o timeout
E0424 18:53:44.130878 16826 process.go:55/processor] (1/testingsub) Error peeking next message size: EOF
E0424 18:54:45.138582 16826 sendrecv.go:55/receiver] (3/testingsub) error reading from connection: read tcp 127.0.0.1:36188: i/o timeout
E0424 18:54:45.139055 16826 process.go:55/processor] (3/testingsub) Error peeking next message size: EOF
E0424 18:55:32.609175 16826 process.go:55/processor] (5/testing) Error peeking next message size: EOF
E0424 18:55:35.389322 16826 sendrecv.go:55/receiver] (6/testing) error reading from connection: read tcp 127.0.0.1:37169: connection reset by peer
E0424 18:55:35.389759 16826 process.go:55/processor] (6/testing) Error peeking next message size: EOF
E0424 18:55:46.143154 16826 sendrecv.go:55/receiver] (4/testingsub) error reading from connection: read tcp 127.0.0.1:44035: i/o timeout
E0424 18:55:46.143265 16826 process.go:55/processor] (4/testingsub) Error peeking next message size: EOF
E0424 18:56:47.149011 16826 sendrecv.go:55/receiver] (7/testingsub) error reading from connection: read tcp 127.0.0.1:47237: i/o timeout
E0424 18:56:47.149102 16826 process.go:55/processor] (7/testingsub) Error peeking next message size: EOF
E0424 18:57:48.206160 16826 sendrecv.go:55/receiver] (8/testingsub) error reading from connection: read tcp 127.0.0.1:41006: i/o timeout
E0424 18:57:48.206253 16826 process.go:55/processor] (8/testingsub) Error peeking next message size: EOF
E0424 18:58:49.227415 16826 sendrecv.go:55/receiver] (9/testingsub) error reading from connection: read tcp 127.0.0.1:37368: i/o timeout
E0424 18:58:49.227507 16826 process.go:55/processor] (9/testingsub) Error peeking next message size: EOF
as you see, I got timeout that closed subscribed script connection althought mqtt has reconnect feature.
and error peeking next message while receive publish or sending subscribtion
every operation of *Session.Cmsg
in session.go is like this
this.mu.Lock()
defer this.mu.Unlock()
blabla~~~
but in service.go line 237-241
// Publish will message if WillFlag is set. Server side only.
if !this.client && this.sess.Cmsg.WillFlag() {
glog.Infof("(%s) service/stop: connection unexpectedly closed. Sending Will.", this.cid())
this.onPublish(this.sess.Will)
}
there is no lock when operating this.sess.Cmsg
My opinion is:
Session.Cmsg
privatethis.sess.Cmsg
sync.Mutex
to sync.RWMutex
func (this *Session) WillFlag() bool {
this.mu.Lock()
defer this.mu.Unlock()
return this.cmsg.WillFlag()
}
func (this *Session) SetWillFlag(v bool) {
this.mu.Lock()
defer this.mu.Unlock()
this.cmsg.SetWillFlag(v)
}
func (this *Session) CleanSession() bool {
this.mu.Lock()
defer this.mu.Unlock()
return this.cmsg.CleanSession()
}
Another place of this problem in memtopics.go line 128-130
func (this *memTopics) Close() error {
this.sroot = nil
this.rroot = nil
return nil
}
shoud change to
func (this *memTopics) Close() error {
this.smu.Lock()
this.sroot = nil
this.smu.Unlock()
this.rmu.Lock()
this.rroot = nil
this.rmu.Unlock()
}
I'll Pull Request later when I complete my coding
Ping will break when it is,at <header.go line 197>,
I added a length judgment,if len(src)==0{return total,nil}
ps:Please ignore the Google translation level.
As queried https://gophers.slack.com/archives/C03QQ3L05/p1498435372556590 and as responded https://gophers.slack.com/archives/C03QQ3L05/p1498586558298717
Just in case you too were also wondering :)
The following code fails, always after some 42 or 43 seconds (go 1.5.3, Mac OSX 10.11.3):
package main
import (
"flag"
"time"
"github.com/surge/glog"
"github.com/surgemq/message"
"github.com/surgemq/surgemq/service"
)
func main() {
flag.Parse()
defer glog.Flush()
go func() {
srv := service.Server{}
glog.Info("starting MQTT server at :12345")
glog.Fatal(srv.ListenAndServe("tcp://:12345"))
}()
// wait a bit, the internal MQTT server is still starting up
time.Sleep(time.Second)
client := &service.Client{}
msg := message.NewConnectMessage()
msg.SetVersion(4)
msg.SetCleanSession(true)
msg.SetClientId([]byte("me"))
if err := client.Connect("tcp://:12345", msg); err != nil {
glog.Fatal(err)
}
glog.Infoln("connected to port 12345")
for t := range time.Tick(time.Second) {
glog.Info(t)
msg := message.NewPublishMessage()
msg.SetTopic([]byte("abc"))
msg.SetPayload([]byte("def"))
e := client.Publish(msg, nil)
if e != nil {
glog.Fatal("publish: ", e)
}
}
}
Sample error output from go run blah.go -logtostderr
:
...
I0122 21:40:55.862248 11954 blah.go:37/main] 2016-01-22 21:40:55.862186826 +0100 CET
I0122 21:40:56.865790 11954 blah.go:37/main] 2016-01-22 21:40:56.8656395 +0100 CET
I0122 21:40:57.864022 11954 blah.go:37/main] 2016-01-22 21:40:57.863648731 +0100 CET
E0122 21:40:58.861597 11954 sendrecv.go:77/receiver] (2/me) error reading from connection: read tcp 127.0.0.1:52627->127.0.0.1:12345: i/o timeout
I0122 21:40:58.862059 11954 blah.go:37/main] 2016-01-22 21:40:58.861395068 +0100 CET
F0122 21:40:58.862185 11954 blah.go:44/main] publish: (2/me) Error sending PUBLISH message: service: buffer is not ready
exit status 255
I can't figure out what is causing this problem. Can anyone else reproduce this on a different system?
What's MPS's full name?
if server recevie a publish message which RETAIN flag is 1 and QOS is 0, It must drop all retained topic message. But the code is not do this.
why?
when i run example/pingmq,
error like this:
./pingmq flag redefined: log_dir
panic: ./pingmq flag redefined: log_dir
goroutine 1 [running]:
flag.(*FlagSet).Var(0xc0000a4120, 0x72f8c0, 0xc000091750, 0x6e7dfd, 0x7, 0x6f3a95, 0x2f)
/usr/local/go/src/flag/flag.go:805 +0x529
flag.(*FlagSet).StringVar(0xc0000a4120, 0xc000091750, 0x6e7dfd, 0x7, 0x0, 0x0, 0x6f3a95, 0x2f)
/usr/local/go/src/flag/flag.go:708 +0x8a
flag.(*FlagSet).String(0xc0000a4120, 0x6e7dfd, 0x7, 0x0, 0x0, 0x6f3a95, 0x2f, 0xc000091740)
/usr/local/go/src/flag/flag.go:721 +0x8b
flag.String(0x6e7dfd, 0x7, 0x0, 0x0, 0x6f3a95, 0x2f, 0xc000091720)
/usr/local/go/src/flag/flag.go:728 +0x69
I hava some confusions, delete this issue if I make it wrong.
1 code: https://github.com/surgemq/surgemq/blob/master/service/buffer.go#L121-L123
is:
this.pcond.L.Lock()
this.ccond.Broadcast()
this.pcond.L.Unlock()
or just:
this.ccond.L.Lock()
this.ccond.Broadcast()
this.ccond.L.Unlock()
?
2 https://github.com/surgemq/surgemq/blob/master/topics/memtopics.go#L30-L48
why there are two same lines:
var _ TopicsProvider = (*memTopics)(nil)
Appreciate!
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.