Giter Site home page Giter Site logo

paho.mqtt.golang's Introduction

PkgGoDev Go Report Card

Eclipse Paho MQTT Go client

This repository contains the source code for the Eclipse Paho MQTT 3.1/3.11 Go client library.

This code builds a library which enable applications to connect to an MQTT broker to publish messages, and to subscribe to topics and receive published messages.

This library supports a fully asynchronous mode of operation.

A client supporting MQTT V5 is also available.

Installation and Build

The process depends upon whether you are using modules (recommended) or GOPATH.

Modules

If you are using modules then import "github.com/eclipse/paho.mqtt.golang" and start using it. The necessary packages will be download automatically when you run go build.

Note that the latest release will be downloaded and changes may have been made since the release. If you have encountered an issue, or wish to try the latest code for another reason, then run go get github.com/eclipse/paho.mqtt.golang@master to get the latest commit.

GOPATH

Installation is as easy as:

go get github.com/eclipse/paho.mqtt.golang

The client depends on Google's proxy package and the websockets package, also easily installed with the commands:

go get github.com/gorilla/websocket
go get golang.org/x/net/proxy

Usage and API

Detailed API documentation is available by using to godoc tool, or can be browsed online using the pkg.go.dev service.

Samples are available in the cmd directory for reference.

Note:

The library also supports using MQTT over websockets by using the ws:// (unsecure) or wss:// (secure) prefix in the URI. If the client is running behind a corporate http/https proxy then the following environment variables HTTP_PROXY, HTTPS_PROXY and NO_PROXY are taken into account when establishing the connection.

Troubleshooting

If you are new to MQTT and your application is not working as expected reviewing the MQTT specification, which this library implements, is a good first step. MQTT.org has some good resources that answer many common questions.

Error Handling

The asynchronous nature of this library makes it easy to forget to check for errors. Consider using a go routine to log these:

t := client.Publish("topic", qos, retained, msg)
go func() {
    _ = t.Wait() // Can also use '<-t.Done()' in releases > 1.2.0
    if t.Error() != nil {
        log.Error(t.Error()) // Use your preferred logging technique (or just fmt.Printf)
    }
}()

Logging

If you are encountering issues then enabling logging, both within this library and on your broker, is a good way to begin troubleshooting. This library can produce various levels of log by assigning the logging endpoints, ERROR, CRITICAL, WARN and DEBUG. For example:

func main() {
	mqtt.ERROR = log.New(os.Stdout, "[ERROR] ", 0)
	mqtt.CRITICAL = log.New(os.Stdout, "[CRIT] ", 0)
	mqtt.WARN = log.New(os.Stdout, "[WARN]  ", 0)
	mqtt.DEBUG = log.New(os.Stdout, "[DEBUG] ", 0)

	// Connect, Subscribe, Publish etc..
}

Common Problems

  • Seemingly random disconnections may be caused by another client connecting to the broker with the same client identifier; this is as per the spec.
  • Unless ordered delivery of messages is essential (and you have configured your broker to support this e.g. max_inflight_messages=1 in mosquitto) then set ClientOptions.SetOrderMatters(false). Doing so will avoid the below issue (deadlocks due to blocking message handlers).
  • A MessageHandler (called when a new message is received) must not block (unless ClientOptions.SetOrderMatters(false) set). If you wish to perform a long-running task, or publish a message, then please use a go routine (blocking in the handler is a common cause of unexpected pingresp not received, disconnecting errors).
  • When QOS1+ subscriptions have been created previously and you connect with CleanSession set to false it is possible that the broker will deliver retained messages before Subscribe can be called. To process these messages either configure a handler with AddRoute or set a DefaultPublishHandler. If there is no handler (or DefaultPublishHandler) then inbound messages will not be acknowledged. Adding a handler (even if it's opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})) is highly recommended to avoid inadvertently hitting inflight message limits.
  • Loss of network connectivity may not be detected immediately. If this is an issue then consider setting ClientOptions.KeepAlive (sends regular messages to check the link is active).
  • Reusing a Client is not completely safe. After calling Disconnect please create a new Client (NewClient()) rather than attempting to reuse the existing one (note that features such as SetAutoReconnect mean this is rarely necessary).
  • Brokers offer many configuration options; some settings may lead to unexpected results.
  • Publish tokens will complete if the connection is lost and re-established using the default options.SetAutoReconnect(true) functionality (token.Error() will return nil). Attempts will be made to re-deliver the message but there is currently no easy way know when such messages are delivered.

If using Mosquitto then there are a range of fairly common issues:

  • listener - By default Mosquitto v2+ listens on loopback interfaces only (meaning it will only accept connections made from the computer its running on).
  • max_inflight_messages - Unless this is set to 1 mosquitto does not guarantee ordered delivery of messages.
  • max_queued_messages / max_queued_bytes - These impose limits on the number/size of queued messages. The defaults may lead to messages being silently dropped.
  • persistence - Defaults to false (messages will not survive a broker restart)
  • max_keepalive - defaults to 65535 and, from version 2.0.12, SetKeepAlive(0) will result in a rejected connection by default.

Reporting bugs

Please report bugs by raising issues for this project in github https://github.com/eclipse/paho.mqtt.golang/issues

A limited number of contributors monitor the issues section so if you have a general question please see the resources in the more information section for help.

We welcome bug reports, but it is important they are actionable. A significant percentage of issues reported are not resolved due to a lack of information. If we cannot replicate the problem then it is unlikely we will be able to fix it. The information required will vary from issue to issue but almost all bug reports would be expected to include:

  • Which version of the package you are using (tag or commit - this should be in your go.mod file)
  • A full, clear, description of the problem (detail what you are expecting vs what actually happens).
  • Configuration information (code showing how you connect, please include all references to ClientOption)
  • Broker details (name and version).

If at all possible please also include:

  • Details of your attempts to resolve the issue (what have you tried, what worked, what did not).
  • A Minimal, Reproducible Example. Providing an example is the best way to demonstrate the issue you are facing; it is important this includes all relevant information (including broker configuration). Docker (see cmd/docker) makes it relatively simple to provide a working end-to-end example.
  • Broker logs covering the period the issue occurred.
  • Application Logs covering the period the issue occurred. Unless you have isolated the root cause of the issue please include a link to a full log (including data from well before the problem arose).

It is important to remember that this library does not stand alone; it communicates with a broker and any issues you are seeing may be due to:

  • Bugs in your code.
  • Bugs in this library.
  • The broker configuration.
  • Bugs in the broker.
  • Issues with whatever you are communicating with.

When submitting an issue, please ensure that you provide sufficient details to enable us to eliminate causes outside of this library.

Contributing

We welcome pull requests but before your contribution can be accepted by the project, you need to create and electronically sign the Eclipse Contributor Agreement (ECA) and sign off on the Eclipse Foundation Certificate of Origin.

More information is available in the Eclipse Development Resources; please take special note of the requirement that the commit record contain a "Signed-off-by" entry.

More information

Stack Overflow has a range questions/answers covering a range of common issues (both relating to use of this library and MQTT in general). This is the best place to ask general questions (including those relating to the use of this library).

Discussion of the Paho clients takes place on the Eclipse paho-dev mailing list.

General questions about the MQTT protocol are discussed in the MQTT Google Group.

There is much more information available via the MQTT community site.

paho.mqtt.golang's People

Contributors

ajhattozxcorp avatar akokhanovskyi avatar aleksclark avatar alsm avatar an2deg avatar brocaar avatar brycereitano avatar danielbprice avatar dawxy avatar dvasselli avatar fulder avatar giladdacyberark avatar htdvisser avatar illule avatar isaacseymour avatar jfcg avatar ktorz avatar mattbrittan avatar mogaleaf avatar panter-dsd avatar ptsneves avatar reubenmiller avatar robbawebba avatar sahib avatar shivamkm07 avatar shoenig avatar thomas-tacquet avatar tomatod avatar v-zhuravlev avatar vruge avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

paho.mqtt.golang's Issues

Regular disconnect causes errors to be logged

migrated from Bugzilla #467705
status RESOLVED severity normal in component MQTT-Go for 1.2
Reported in version 0.9 on platform Macintosh
Assigned to: Al Stockdill-Mander

On 2015-05-20 08:31:46 -0400, Fabian Ruff wrote:

I noticed that every time I call client.Disconnect()

I get the following log output from the mqtt lib:

DEBUG [client] disconnecting
DEBUG [net] obound priority msg to write, type *packets.DisconnectPacket
DEBUG [net] outbound wrote disconnect, stopping
ERROR [net] incoming stopped with error
ERROR [net] logic got error
DEBUG [pinger] keepalive stopped
DEBUG [client] disconnected
DEBUG [store] memorystore closed

The two error lines seem like a bug to me. Looking at the code its seems there is logic in place for handling a clean shutdown by closing the c.stop channel.

The problem seems to be that the channel is closed after the tcp connection is closed.

Printing the error returned by packets.ReadPacket(c.conn) in func incoming(c *Client) yields a
read tcp 127.0.0.1:1883: use of closed network connection

I'm using mosquitto 1.4.2 as the mqtt broker.
This was tested with the latest master of the paho library (SHA: 1d65364)

On 2015-06-24 06:39:50 -0400, Al Stockdill-Mander wrote:

Should be resolved with this commit
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: efdb79c

Thanks for reporting.

On 2015-06-24 16:40:20 -0400, Fabian Ruff wrote:

I just did a quick test with the latest master and the error messages are still there, so on first glance it looks like the changes didn't fix it.

I will do some more detailed testing tomorrow.

Kind regards,
Fabian

On 2015-06-25 10:32:40 -0400, Fabian Ruff wrote:

I did some further debugging and I think I found the problem (and the fix).

The problem is in disconnect() where a channel read from c.stop is used to figure out if the channel is already closed.
This read blocks because nothing is ever send on the stop channel and only returns after the channel is closed by internalConnLost().

The solution is to use the default: case of the select statement to check if the stop channel hasn't been closed already.

Cheers,
Fabian

index e888522..c9d2174 100644
--- git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go
+++ git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go
@@ -384,10 +384,9 @@ func (c *Client) internalConnLost(err error) {

func (c *Client) disconnect() {
select {

  •   case _, ok := <-c.stop:
    
  •           if ok {
    
  •                   close(c.stop)
    
  •           }
    
  •   case <-c.stop:
    
  •   default:
    
  •           close(c.stop)
    }
    //Wait for all workers to finish before closing connection
    c.workers.Wait()
    

On 2015-09-18 04:42:11 -0400, Fabian Ruff wrote:

Bump. I kindly ask for some feedback on my latest message on this issue

On 2015-09-30 04:14:58 -0400, Al Stockdill-Mander wrote:

The change you suggested caused some of the test cases to hang as we end up with disconnect waiting on the WaitGroup and incoming() goroutine waiting on ReadPacket(), I have made a change that seems to handle this and not produce error output (and in the process found I was leaking goroutines with the router).
I've pushed my changes please let me know if this resolves the issue for you or if it exposes anything new.

On 2015-09-30 07:23:48 -0400, Fabian Ruff wrote:

Initial tests on my side are looking good. Thanks for taking care!

On 2015-10-05 06:25:28 -0400, Al Stockdill-Mander wrote:

Marking as resolved based on last comment

Build Fails When Vendored

Wen downloaded in vendor directory as a dependency of a project, command

eval go get -t -v ./...

gives error:

github.com/mainflux/mainflux/vendor/github.com/eclipse/paho.mqtt.golang/samples
# github.com/mainflux/mainflux/vendor/github.com/eclipse/paho.mqtt.golang/samples
vendor/github.com/eclipse/paho.mqtt.golang/samples/routing.go:61: main redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/custom_store.go:65
vendor/github.com/eclipse/paho.mqtt.golang/samples/sample.go:42: main redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/routing.go:61
vendor/github.com/eclipse/paho.mqtt.golang/samples/sample.go:107: main.func1 redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/custom_store.go:73
vendor/github.com/eclipse/paho.mqtt.golang/samples/simple.go:31: main redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/sample.go:42
vendor/github.com/eclipse/paho.mqtt.golang/samples/ssl.go:97: f redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/simple.go:29
vendor/github.com/eclipse/paho.mqtt.golang/samples/ssl.go:99: main redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/simple.go:31
vendor/github.com/eclipse/paho.mqtt.golang/samples/stdinpub.go:31: main redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/ssl.go:99
vendor/github.com/eclipse/paho.mqtt.golang/samples/stdoutsub.go:37: main redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/stdinpub.go:31
vendor/github.com/eclipse/paho.mqtt.golang/samples/stdoutsub.go:43: main.func1 redeclared in this block
    previous declaration at vendor/github.com/eclipse/paho.mqtt.golang/samples/sample.go:107

Provide ability to retrieve options used to create client

migrated from Bugzilla #486529
status UNCONFIRMED severity normal in component MQTT-Go for 1.2
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2016-01-26 03:08:32 -0500, Mark Mindenhall wrote:

After creating a Client with ClientOptions, there's no way to retrieve the ClientOptions from the Client. I want to implement a single ConnectionLostHandler to be used by all my connections (I have more than one), but when the handler is invoked, there's no way to tell from the *Client passed in which connection was lost.

Obviously, after a Client has been created, no further changes should be allowed to the ClientOptions, so the Client.options field should not be exported. However, a method could be added that returns a copy of the ClientOptions struct used to create the Client:

// Options returns a copy of the ClientOptions used to create this client.
func (c *Client) Options() ClientOptions {
optionsCopy := c.options
return optionsCopy
}

On 2016-01-27 02:35:29 -0500, Mark Mindenhall wrote:

Actually, since the Servers field is an array of *url.URL, it's not so trivial as assigning to a new variable to copy the struct (I think the copy would retain the pointers). Instead, the struct can be cloned via encoding/gob as follows (warning, may not be 100% correct, but gives general idea):

// Options returns a copy of the ClientOptions used to create this client.
func (c *Client) Options() ClientOptions {
var buf = bytes.Buffer
enc := gob.NewEncoder(&buf)
enc.Encode(c.options)

var optionsCopy *ClientOptions
dec := gob.NewDecoder(&buf)
dec.Decode(optionsCopy)
return *optionsCopy
}

On 2016-01-27 10:57:21 -0500, Mark Mindenhall wrote:

I think a better solution would be to create an interface comprised of getters for ClientOptions fields. Then provide a method on the Client that returns the interface

Another solution here would be to create an interface comprised of getters for ClientOptions, then provide a method on Client that returns the interface.

On 2016-01-27 10:59:54 -0500, Mark Mindenhall wrote:

I somehow submitted the previous comment before I was done editing...sorry for the redundancy, but you get the point.

Waiting subscribe token is finished before message receiveing

migrated from Bugzilla #464969
status CLOSED severity normal in component MQTT-Go for 1.1
Reported in version 0.9 on platform All
Assigned to: Al Stockdill-Mander

On 2015-04-20 01:17:45 -0400, [email protected] Takanori wrote:

I use the Client.Subscribe function for messages are recieved from MQTT Broker.

That implemantation is as follows;

func Subscribe(client *MQTT.Client, topic string, qos byte) {
var handler MQTT.MessageHandler = func(client *MQTT.Client, msg MQTT.Message) {
fmt.Printf("Received message : topic=%s, message=%s\n", msg.Topic(), msg.Payload())
}

token := client.Subscribe(topic, qos, handler)

if token.Wait() && token.Error() != nil {
fmt.Printf("Subscribe error: %s\n", token.Error())
}

}

If receiveing the message, 'token.Wait()' is finished before handling message. And if receiveing no message, handler is not called.

In this behavior, it can't be distinguished between receiveing the message or not receiveing the message.

Is it possible that 'token.Wait()' wait for MessageHandler function finished?

On 2015-04-20 04:47:46 -0400, Al Stockdill-Mander wrote:

Subscribe function is different from receiving messages.
The token from calling client.Subscribe will be complete when the client has received the SUBACK from the server, this means that the server has received the request to subscribe to a topic from the client.

MQTT is asynchronous pub/sub, it is not possible with this client (or most clients) to subscribe and receive a message in a single action.

As I say, the subscribe function completes when the server has received your subscribe request. The handler function will be called separately when your client receives a message.

On 2015-04-22 11:33:31 -0400, [email protected] Takanori wrote:

Thank you for your reply.

As I say, the subscribe function completes when the server has received your >subscribe request. The handler function will be called separately when your >client receives a message.

I understood the behavior of the 'Subscribe' function.
'token.Wait()' complete when the server has received your subscribe request.

I will use like that.

client.Disconnect() panics when client tries to reconnect

When client tries to reconnect after lost connection to the broker, calling client.Disconnect() causes panic in https://github.com/eclipse/paho.mqtt.golang/blob/master/client.go#L441, because c.conn is nil even when client.IsConnected() reports true.

Explicit check for nil value avoids the panic, but I'm not sure it cures the root cause.

func (c *client) disconnect() {
    c.closeStop()
    if c.conn != nil {
        c.conn.Close()
    }
    c.workers.Wait()
    close(c.stopRouter)
    DEBUG.Println(CLI, "disconnected")
    c.persist.Close()
}

versions:

mosquitto version 1.4.8 (build date Sun, 14 Feb 2016 15:06:55 +0000)
go version go1.7 linux/amd64

test case:

package main

import (
    "log"
    "os"
    MQTT "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    MQTT.ERROR = log.New(os.Stdout, "", 0)
    MQTT.WARN = log.New(os.Stdout, "", 0)
    MQTT.DEBUG = log.New(os.Stdout, "", 0)

    quit := make(chan struct{})

    connOpts := MQTT.NewClientOptions()
    connOpts.AddBroker("tcp://127.0.0.1:1883")
    connOpts.SetAutoReconnect(true)
    connOpts.SetCleanSession(true)
    connOpts.SetConnectionLostHandler(func(client MQTT.Client, err error) {
        close(quit)
    })
    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        log.Println("connect", token.Error())
        return
    }
    log.Println("Stop mosquitto now, please.")
    select {
    case <-quit:
        log.Println("quit")
    }
    if client.IsConnected() {
        log.Println("disconnecting")
        client.Disconnect(5000)
    }
    log.Println("exit")
}

crash in MQTT packet parser

migrated from Bugzilla #475949
status NEW severity normal in component MQTT-Go for 1.2
Reported in version v0.5 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-08-26 11:59:14 -0400, Julien Vermillard wrote:

if you try to parse a packet with the value "30" in hexa you receive a panic:

panic: runtime error: makeslice: len out of range [recovered]
panic: runtime error: makeslice: len out of range

goroutine 8 [running]:
testing.tRunner.func1(0xc8200902d0)
/usr/local/go/src/testing/testing.go:450 +0x171
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets.(*PublishPacket).Unpack(0xc82001c420, 0x7f9fd1e7c648, 0xc820012310)
/home/jvermillar/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets/publish.go:55 +0xd9
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets.ReadPacket(0x7f9fd1e7c648, 0xc8200122a0, 0x7f9fd1e7c698, 0xc82001c420, 0x0, 0x0)
/home/jvermillar/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets/packets.go:126 +0x51e
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets.TestInvalidLen(0xc8200902d0)
/home/jvermillar/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets/packets_test.go:127 +0x25c
testing.tRunner(0xc8200902d0, 0x6fb568)
/usr/local/go/src/testing/testing.go:456 +0x98
created by testing.RunTests
/usr/local/go/src/testing/testing.go:561 +0x86d

goroutine 1 [chan receive]:
testing.RunTests(0x64f728, 0x6fb520, 0x5, 0x5, 0xc820053e01)
/usr/local/go/src/testing/testing.go:562 +0x8ad
testing.(*M).Run(0xc820053ef8, 0xc8200961e0)
/usr/local/go/src/testing/testing.go:494 +0x70
main.main()
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets/_test/_testmain.go:62 +0x116

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1696 +0x1
exit status 2
FAIL git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets 0.005s

Faulty code:
p.Payload = make([]byte, payloadLength)
if the payloadLength is <0 we receive a go panic

On 2015-08-26 12:00:12 -0400, Julien Vermillard wrote:

the unit tests for testing it (to put in packets_test.go

func TestInvalidLen(t *testing.T) {
buff, err := hex.DecodeString("30")
if err != nil {
t.Fatalf("Error decoding hex packet: %s", err.Error())
}

packetBytes := bytes.NewBuffer(buff)
_, err = ReadPacket(packetBytes)
if err != nil {
t.Errorf("Must generate an error")
}
}

On 2015-08-26 12:18:25 -0400, Julien Vermillard wrote:

type ControlPacket interface {
Write(io.Writer) error
Unpack(io.Reader)
String() string
Details() Details
UUID() uuid.UUID
}
Unpack(io.Reader) should be able to return an error, or it will be quite difficult to fix this bug, WDYT?

Clients on localhost constantly disconnect/reconnect to broker

We have been having issues where clients on the local host would disconnect from the broker due to a ping response timeout even though the response was received. Once the problem starts, it progressively gets worse.

2016/05/27 22:00:53 ping.go:33: [pinger] keepalive sending ping
2016/05/27 22:00:53 net.go:81: [net] Received Message
2016/05/27 22:00:53 net.go:176: [net] logic got msg on ibound
2016/05/27 22:00:53 net.go:180: [net] received pingresp
2016/05/27 22:00:53 net.go:172: [net] logic waiting for msg on ibound
2016/05/27 22:00:53 ping.go:40: [pinger] pingresp not received, disconnecting
2016/05/27 22:00:53 net.go:108: [net] outgoing stopped
2016/05/27 22:00:53 net.go:267: [net] logic stopped
2016/05/27 22:00:53 net.go:88: [net] incoming stopped
2016/05/27 22:00:53 client.go:264: [client] enter reconnect
2016/05/27 22:00:53 client.go:275: [client] about to write new connect msg
2016/05/27 22:00:53 client.go:553: Connection lost: pingresp not received, disconnecting
2016/05/27 22:00:53 client.go:278: [client] socket connected to broker
2016/05/27 22:00:53 client.go:285: [client] Using MQTT 3.1.1 protocol
2016/05/27 22:00:53 client.go:349: [net] connect started
2016/05/27 22:00:53 client.go:367: [net] received connack
2016/05/27 22:00:53 net.go:102: [net] outgoing started
2016/05/27 22:00:53 net.go:105: [net] outgoing waiting for an outbound message
2016/05/27 22:00:53 net.go:169: [net] logic started
2016/05/27 22:00:53 net.go:172: [net] logic waiting for msg on ibound
2016/05/27 22:00:53 client.go:331: [client] client is reconnected
2016/05/27 22:00:53 net.go:75: [net] incoming started
2016/05/27 22:00:53 ping.go:24: [pinger] keepalive starting

Multiple clients in single program not allowed

migrated from Bugzilla #447998
status CLOSED severity normal in component MQTT-Go for 1.1
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2014-10-20 19:37:48 -0400, Cem Ezberci wrote:

Current implementation of Go package does not allow creating multiple clients in the same program. The use case is testing multiple connections to a broker and publishing messages to load test the broker. The test was done creating multiple clients in a loop within an anonymous function prefaced with go keyword. Following error is produced:

[error] go-mqtt suffered fatal error EOFexit status 1

On 2014-10-20 20:12:22 -0400, Cem Ezberci wrote:

Problem solved. Apparently client id has to be unique across connecting clients, which makes sense. It was not easy to figure out why it failed though.

On 2014-10-20 20:12:53 -0400, Cem Ezberci wrote:

Closing

Publish stalls if client is in connecting state

Hi!

I'm developing an application that connects to a lot of different brokers. For this it needs to handle the unavailability of brokers and take necessary actions.

I've ran into a problem regarding publishing messages. If you try to connect to a client with AutoReconnect = true (which is the default), the client will be in connecting state, even if the connect fails.

If you try to publish a message with this client, the application will stall.
This is because the isConnected method returns true if AutoReconnect is true and the connection status is > disconnected, which is true for every client that tried connecting at least once.

The only way to prevent this is to set AutoReconnect to false and handle reconnects manually or to disconnect the client after a failed attempt to connect.

Here is an example code to reconstruct the issue:

package main

import (
    "fmt"
    "log"
    "os"
    "time"

    "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    mqtt.DEBUG = log.New(os.Stdout, "", 0)
    mqtt.ERROR = log.New(os.Stdout, "", 0)

    // Set some broker that is not reachable
    opts := mqtt.NewClientOptions().AddBroker("tcp://notreachable:1883").SetClientID("gotrivial")

    c := mqtt.NewClient(opts)

    // Connect should and will fail
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
    }

    // Check if client is connected and if yes, send message
    if (c.IsConnected()) {

        // SHOULD NOT GO HERE, BUT DOES
        fmt.Println("publishing message")
        token := c.Publish("testtopic", 0, false, "testmessage")

        // WILL NEVER GO HERE IF BROKER IS NOT REACHABLE
        fmt.Println("waiting for publish to complete")
        token.WaitTimeout(time.Second*5)
        fmt.Println("publish finished")
    }

    fmt.Println("Disconnecting")
    c.Disconnect(250)

    time.Sleep(1 * time.Second)
}

I would suggest to integrate a new method like isReachable() or to modify the isConnected() method.

on Publish , Golang paho client is reconnecting. not publishing

Hello , i am using , paho client for golang. it was working fine. after update , on publish messages it started reconnecting and not publishing messages.

func start_mqtt() {
opts := MQTTT.NewClientOptions().AddBroker(MQTT)
    opts.SetClientID("MY_CLIENT_1")
    opts.SetAutoReconnect(true)
    opts.SetKeepAlive(time.Second * 300)
    opts.SetDefaultPublishHandler(f)

    MqttClient = MQTTT.NewClient(opts)

    if token := MqttClient.Connect(); token.Wait() && token.Error() != nil {
        log.Println("Error while connecting to Mqtt Broker.")
        panic(token.Error())
    }
    log.Println("Mqtt connection successful.")

    if token := MqttClient.Subscribe("MY_CLIENT_1", 1, nil); token.Wait() && token.Error() != nil {
        log.Println("Error while subscribing to tpoic 1.")
        fmt.Println(token.Error())
        os.Exit(1)
    }
    log.Println("Topic subscription successful.")

//UNTIL HERE IT WORKS 


    for i := 0; i < 5; i++ {
    text := fmt.Sprintf("this is msg #%d!", i)
    token := MqttClient.Publish("MY_CLIENT_2", 1, false, text)
    token.Wait()
    }

//PUBLISH IS NOT WORKING 

}

Go client panics if OnConnectionLost is nil

migrated from Bugzilla #464934
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2015-04-18 11:29:03 -0400, Scott Daniel wrote:

If the OnConnectionLost open is nil, the client will panic when a connection is lost. The client should check for nil values and not start the go routine on line 374 of client.go (see below)

func (c *Client) internalConnLost(err error) {
close(c.stop)
c.conn.Close()
c.workers.Wait()
if c.IsConnected() {
go c.options.OnConnectionLost(c, err)
if c.options.AutoReconnect {
go c.reconnect()
} else {
c.setConnected(false)
}
}
}

On 2015-04-20 04:40:19 -0400, Al Stockdill-Mander wrote:

http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 1d65364

Thanks for reporting.

[bug] auto-reconnect hangs on first attempt.

Auto-reconnect seems to be hanging when attempting to reconnect to a SSL host with credentials. Using latest master for the golang client and using 1.4.9 mosquitto. There is a loadbalancer in front of it forwarding 1883 to it, and forwarding 8883 with enforced ssl.
Edit: I just attempted with an non-SSL connection, and same issue arrises.

This is my method of connecting.

    opts := mqtt.NewClientOptions()
    opts.AddBroker(secureMQTTHost)
    opts.SetCleanSession(false)
    opts.SetAutoReconnect(true)
    opts.SetUsername(username)
    opts.SetPassword(password)
    if config.DebugMode {
        mqtt.DEBUG = log.New(os.Stdout, "DEBUG ", 0)
        mqtt.WARN = log.New(os.Stdout, "WARN ", 0)
        mqtt.CRITICAL = log.New(os.Stdout, "CRITICAL ", 0)
        mqtt.ERROR = log.New(os.Stdout, "ERROR ", 0)
    }

    mqttClient =: mqtt.NewClient(opts)
    if token := s.mqttClient.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf("Could not connect to mqtt broker.")
    }

Even when the broker host is local, with some self-signed certificates. I run into the same issue.

The trace log is as follows, and it will forever hang on DEBUG [net] connect started. Publish messages will hang forever, unless I specify a timeout. Which of course always times out.

DEBUG [client]   Connect()
DEBUG [store]    memorystore initialized
DEBUG [client]   about to write new connect msg
DEBUG [client]   socket connected to broker
DEBUG [client]   Using MQTT 3.1.1 protocol
DEBUG [net]      connect started
DEBUG [net]      received connack
DEBUG [client]   client is connected
DEBUG [client]   exit startClient
DEBUG [net]      outgoing started
DEBUG [net]      outgoing waiting for an outbound message
DEBUG [net]      incoming started
DEBUG [pinger]   keepalive starting
DEBUG [net]      logic started
DEBUG [net]      logic waiting for msg on ibound
DEBUG [client]   enter Publish
DEBUG [client]   sending publish message, topic: heartbeat
DEBUG [net]      obound wrote msg, id: 1
DEBUG [net]      outgoing waiting for an outbound message
DEBUG [net]      Received Message
DEBUG [net]      logic got msg on ibound
DEBUG [store]    memorystore del: message 1 was deleted
DEBUG [net]      received puback, id: 1
DEBUG [net]      logic waiting for msg on ibound
...
ERROR [net]      incoming stopped with error EOF
ERROR [net]      logic received from error channel, other components have errored, stopping
DEBUG [pinger]   keepalive stopped
DEBUG [net]      outgoing stopped
DEBUG [client]   enter reconnect
DEBUG [client]   about to write new connect msg
DEBUG Connection lost: EOF
DEBUG [client]   socket connected to broker
DEBUG [client]   Using MQTT 3.1.1 protocol
DEBUG [net]      connect started

panic when Disconnect() invoked after internalConnLost

migrated from Bugzilla #464559
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version 0.9 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-04-14 00:22:00 -0400, shirou WAKAYAMA wrote:

paho library causes panic when invoke Disconnect().

Here is my sample program.

package main

import (
"fmt"
"time"

MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

func main() {
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
opts.SetCleanSession(true)

c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
fmt.Println("plz down mosquitto now")
time.Sleep(5 * time.Second)

c.Disconnect(200)
time.Sleep(5 * time.Second)

}

How to reproduce:

  1. mosquitto up
  2. go run close_panic.go
  3. mosquitto down after "plz down mosquitto now" shows
  4. wait 5 sec
  5. panic

This is caused by close(c.stop) in the Client.disconnect(). c.stop is already closed at the internalConnLost(), so the second disconnect() close already closed channel.

To avoid panic, I think check the channel is already close or not using select/case.

@@ -382,7 +383,12 @@ func (c *Client) internalConnLost(err error) {
}

func (c *Client) disconnect() {

  •   close(c.stop)
    
  •   select {
    
  •   case _, ok := <-c.stop:
    
  •           if ok {
    
  •                   close(c.stop)
    
  •           }
    
  •   }
    

It can fix problem at least my environment.

Thank you.

On 2015-04-14 00:42:15 -0400, shirou WAKAYAMA wrote:

I forget to add error and my environment.

go version go1.4.2 darwin/amd64

panic: close of closed channel

goroutine 1 [running]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_Client).disconnect(0xc20805e000)
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:385 +0x3a
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_Client).Disconnect(0xc20805e000, 0xc8)
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:355 +0x466
main.main()
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/samples/close_panic.go:21 +0x210

goroutine 17 [syscall, locked to thread]:
runtime.goexit()
/usr/local/Cellar/go/1.4.2/libexec/src/runtime/asm_amd64.s:2232 +0x1

goroutine 6 [select]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·003()
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/router.go:131 +0x4e6
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*router).matchAndDispatch
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/router.go:161 +0x166

goroutine 13 [sleep]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_Client).reconnect(0xc20805e000)
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:287 +0xd83
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_Client).internalConnLost
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:377 +0xe5

exit status 2

Thank you.

On 2015-04-17 11:19:32 -0400, Al Stockdill-Mander wrote:

http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 15034ff

Thanks

disconnect causes panic in keep-alive

migrated from Bugzilla #463046
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version 0.9 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-03-25 04:02:17 -0400, shirou WAKAYAMA wrote:

After disconnection, it seems keep-alive goroutine is still active. So keep-alive goroutine try to WriteTo() closed connection, causes panic.

Trace log is below.

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x20 pc=0x1523d4]

goroutine 10 [running]:
bytes.(_Buffer).WriteTo(0xc208010310, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/Cellar/go/1.4.1/libexec/src/bytes/buffer.go:202 +0xd4
git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets.(_PingreqPacket).Write(0xc20803af90, 0x0, 0x0, 0x0, 0x0)
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets/pingreq.go:23 +0xd7
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.keepalive(0xc208058000)
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/ping.go:60 +0x6ec
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·001
/Users/blah/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:213 +0x1338

I'm trying to add IsConnected() check into here http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/ping.go#n60
But not worked.

It also need to insert c.setConnected(false) into after "logic got error" caused, around
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/net.go#n275.

Thank you.

On 2015-03-25 13:03:10 -0400, Al Stockdill-Mander wrote:

Do you have a test case that reproduces this issue? I have been trying but in my tests keepalive routine is always stopped after calling Disconnect/ForceDisconnect. While trying to debug this though I noticed a problem with the Unsubscribe packet and Unsubscribe tokens not being completed.

On 2015-03-25 13:17:24 -0400, Al Stockdill-Mander wrote:

(In reply to shirou WAKAYAMA from comment # 0)

It also need to insert c.setConnected(false) into after "logic got error"
caused, around
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/net.
go#n275.

This is currently intentional, with the autoreconnect feature I want to allow you to be able to continue to call Publish()/Subscribe() etc in your application while the client library reconnects for you in the background. The flow of your application should not be interrupted for a minor network issue.

However if you have disabled auto reconnect I should set the client as disconnected and not allow the application to continue making these calls.

On 2015-03-25 21:12:21 -0400, shirou WAKAYAMA wrote:

(In reply to Al Stockdill-Mander from comment # 1)

Do you have a test case that reproduces this issue? I have been trying but
in my tests keepalive routine is always stopped after calling
Disconnect/ForceDisconnect. While trying to debug this though I noticed a
problem with the Unsubscribe packet and Unsubscribe tokens not being
completed.

Sorry, here is sample code.

go version go1.4.1 darwin/amd64
mosquitto version 1.3.5 (build date 2014-10-27 15:13:47+0000)
paho commit: SHA: d7795b6

package main

import (
"time"

MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
)

func main() {
opts := MQTT.NewClientOptions().AddBroker("tcp://localhost:1883")
c := MQTT.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
for {
time.Sleep(1 * time.Second)
}
}

just Connect() and sleep forever.

  1. start mosquitto
  2. go run test.go
  3. shutdown only mosquitto
  4. wait 30sec - 1 min
  5. panic

On 2015-03-25 21:25:00 -0400, shirou WAKAYAMA wrote:

This is currently intentional, with the autoreconnect feature I want to
allow you to be able to continue to call Publish()/Subscribe() etc in your
application while the client library reconnects for you in the background.
The flow of your application should not be interrupted for a minor network
issue.

Thank you. make sense.

However if you have disabled auto reconnect I should set the client as
disconnected and not allow the application to continue making these calls.

sound great. And if that, Application can understand connectivity and control Pub/Sub or whatever.

On 2015-03-26 13:11:47 -0400, Al Stockdill-Mander wrote:

Hopefully the change here (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: ccf6fcf) should have fixed this issue.

On 2015-03-26 21:46:50 -0400, shirou WAKAYAMA wrote:

(In reply to Al Stockdill-Mander from comment # 5)

Hopefully the change here
(http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/
?id=SHA: ccf6fcf) should have fixed this issue.

confirmed. Thank you so much!

The acknowledgement for incoming messages are sent even if the associated MessageHandler panics

migrated from Bugzilla #469350
status UNCONFIRMED severity major in component MQTT-Go for 1.2
Reported in version 0.9 on platform Macintosh
Assigned to: Al Stockdill-Mander

On 2015-06-04 02:50:04 -0400, Saikrishnan Ranganathan wrote:

It looks like the acknowledgement for inbound QoS 1 and 2 messages are sent to the broker even if the MessageHandler associated with the topic panics. So when the client is next re-connected, any QoS 1 or 2 messages will NOT be redelivered by the broker.

Disconnection due to ping race

migrated from Bugzilla #479220
status RESOLVED severity normal in component MQTT-Go for 1.2
Reported in version unspecified on platform Other
Assigned to: Al Stockdill-Mander

On 2015-10-07 05:40:52 -0400, Ivan Shvedunov wrote:

In some cases MQTT client disconnects after a minute or so. Network traffic capture shows that the client closes connection without receiving anything from the server. With debug logging turned on (by editing trace.go), the sequence of events turns out to be the following:

  1. Client sends PINGREQ ("keepalive sending ping")
  2. Client receives PINGRESP ("received pingresp")
  3. After 30 seconds, the client disconnects with the following diagnostics: "pingresp not received, disconnecting"

The problem doesn't always manifest itself. In my case it's only observed on an ARMv5 board which is not very fast.

This behaviour is apparently caused by a race condition in ping.go (line 61 in commit SHA: 546c47a):
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/ping.go?id=SHA: 546c47a#n61

Namely, sometimes the ping.Write(c.conn) call may finish after receiving PINGRESP, and subsequently setting c.pingOutstanding = true causes the client to disconnect after ping timeout. Setting c.pingOutstanding = true before ping.Write(c.conn) fixes the problem:
ivan4th/org.eclipse.paho.mqtt.golang@SHA: 0f67af290eaa090cfd92febdb1075063a31890fe

On 2015-10-07 12:27:34 -0400, Al Stockdill-Mander wrote:

Thanks for reporting this, I've just made the change and pushed it to git.

On the tracing issue the tracepoints ERROR, DEBUG, etc in trace.go are exported by the library so your app can create a logger for any of them and output them as you wish. The first two commented out lines in main() in http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/samples/stdinpub.go give an example.

Current API makes unit testing more difficult due to it's use of structs rather than interfaces

migrated from Bugzilla #443160
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version unspecified on platform All
Assigned to: Al Stockdill-Mander

On 2014-09-03 01:18:03 -0400, Mark Wolfe wrote:

Currently in this library the core externalised functions use structs and therefore impossible to mock for unit testing.

External facing types should be returned as interfaces via API, this will enable those consuming it to build mock implementations for testing. This will bring it more in line with other golang libraries.

This affects all external facing calls which currently expose *Message, *Client, *TopicFilter, *TopicName and *ClientOptions.

On 2014-09-03 08:12:36 -0400, Al Stockdill-Mander wrote:

I have just started a develop branch of the Go client and pushed a large change I had been working on that includes some resolution for the issue here. Please take a look, all feedback welcome.

On 2014-09-03 08:12:52 -0400, Al Stockdill-Mander wrote:

http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?h=develop&id=SHA: 33499ed

^ direct link to the commit in the develop branch

On 2014-09-05 11:46:29 -0400, Julien Vermillard wrote:

Codes looks good! I'm going to test it :)

On a side note: org.eclipse.paho.mqtt.golang.git doesn't look idiomatic golang, do you think there is a way to import the code is a less javaish package fashion?

On 2014-09-05 11:59:52 -0400, Al Stockdill-Mander wrote:

Yeah, you're right the import does look a bit strange. It might be possible to change the name of the repo, it would still be "git.eclipse.org/gitroot/paho/" just have to decide what the something might be.

On 2015-03-23 14:41:31 -0400, Al Stockdill-Mander wrote:

Dont seem to be able to change the import URL, have abstracted the Client struct and other required structs for testing.

Panic in connect() after reconnection.

migrated from Bugzilla #478905
status RESOLVED severity normal in component MQTT-Go for 1.2
Reported in version 1.2 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-10-02 09:09:03 -0400, Tim H wrote:

I get the following crash.

panic: interface conversion: packets.ControlPacket is *packets.PubackPacket, not *packets.ConnackPacket

goroutine 44 [running]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_Client).connect(0x106a4000, 0x4042b688)
c:/Users/thutt/Gocode/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:327 +0x638
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_Client).reconnect(0x106a4000)
c:/Users/thutt/Gocode/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:262 +0x644
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*Client).internalConnLost
c:/Users/thutt/Gocode/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:378 +0xf8

It's actually probably my fault - I previously noticed that the MQTT connection got dropped or stalled or something, and didn't reconnect. So I used the ClientOpts.OnConnectionLost feature to reconnect, not noticing that there was already an AutoReconnect option. The result is that I guess it tried to connect twice.

Anyway, I think the code is still probably wrong because it tries to protect against this error (unexpected packet) but clearly fails:

func (c *Client) connect() byte {
DEBUG.Println(NET, "connect started")

ca, err := packets.ReadPacket(c.conn)
if err != nil {
ERROR.Println(NET, "connect got error", err)
//c.errors <- err
return packets.ErrNetworkError
}
msg := ca.(*packets.ConnackPacket)

if msg == nil || msg.FixedHeader.MessageType != packets.Connack {
ERROR.Println(NET, "received msg that was nil or not CONNACK")
} else {
DEBUG.Println(NET, "received connack")
}
return msg.ReturnCode
}

Here is how it should presumably be:

func (c *Client) connect() byte {
DEBUG.Println(NET, "connect started")

ca, err := packets.ReadPacket(c.conn)
if err != nil {
ERROR.Println(NET, "connect got error", err)
return packets.ErrNetworkError
}
if ca == nil {
ERROR.Println(NET, "received nil packet")
return packets.ErrNetworkError
}

msg, ok := ca.(*packets.ConnackPacket)
if !ok {
ERROR.Println(NET, "received msg that was not CONNACK")
return packets.ErrNetworkError
}

DEBUG.Println(NET, "received connack")
return msg.ReturnCode
}

On 2015-10-04 06:17:49 -0400, Al Stockdill-Mander wrote:

Thanks for raising the bug, I've made changes that pass the default test suite but I don't currently have a test in there that tests reconnect, but not breaking the standard connect makes me hopeful that it should be ok. Let me know if this resolves your problem or causes any other issues. As you say it's possible your issue was caused by a dual attempt to connect on the same client, but even so it shouldn't have been crashing.

client.Connect() after client.Disconnect() doesnt work for persistent session

If I start a durable client connection (setCleanSession = false) and I disconnect, then try to reconnect with the same client I don't receive any messages. The publish and subscribe are done with QOS1.

My suspicion is that on client disconnect the stopRouter chan is closed and it is not reopened on connect.

If i add these lines above line 233 in client.go my session resumes just fine.

	c.msgRouter, c.stopRouter = newRouter()
	c.msgRouter.setDefaultHandler(c.options.DefaultPublishHander)
	if !c.options.AutoReconnect {
		c.options.MessageChannelDepth = 0
	}

bad keepalive value in connect packet

migrated from Bugzilla #483024
status RESOLVED severity normal in component MQTT-Go for 1.2
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2015-11-25 12:45:38 -0500, j3r0lin chen wrote:

in message.go:111, m.KeepaliveTimer = uint16(options.KeepAlive)
the unit of keep alive timer value in connect packet should be second. but now it is nano second.

On 2015-12-01 08:35:32 -0500, Al Stockdill-Mander wrote:

Thanks for reporting this, I have fixed it in the latest push.

[feature] Re-subscribe on reconnect

migrated from Bugzilla #462919
status CLOSED severity enhancement in component MQTT-Go for 1.1
Reported in version 0.9 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-03-24 04:51:59 -0400, shirou WAKAYAMA wrote:

When reconnect while Subscribe, Client does not re-send Subscribe, so Client can not recv any messages after reconnect.
I try to re-subscribe using ConnectionLostHandler, but it is invoked on the goroutine (http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/net.go#n275), reconnect order is not fixed.

How about store a subscribed list in the Client struct and use it at end of the reconnect()?

type Client struct{
(snip)
subscribed map[string]byte // same as SubscribeMultiple
}

And slice management is needed in the Subscribe and Unsubscribe.

If this is implemented, client becomes more stable, I think.

Thank you.

On 2015-03-24 05:39:49 -0400, Al Stockdill-Mander wrote:

This is true, the client will not resend Subscribe requests when reconnecting. There are two solutions that will already work for this; putting Subscribe()s in the onConnect callback, setting cleansession to false (so the server remembers your session).
You are right Subscribe in the onConnectionLostHandler is not going to work as that is triggered at the time of the connection loss.

I think that if you want your subscriptions to persist across disconnections use of the cleansession flag is the appropriate method, putting a local copy of subscriptions into the Client struct seems to be replicating the function in the client.

"And slice management is needed in the Subscribe and Unsubscribe." I am not clear what you mean here, please can you expand.

On 2015-03-24 06:00:14 -0400, shirou WAKAYAMA wrote:

Thank you for replying.

(In reply to Al Stockdill-Mander from comment # 1)

This is true, the client will not resend Subscribe requests when
reconnecting. There are two solutions that will already work for this;
putting Subscribe()s in the onConnect callback, setting cleansession to
false (so the server remembers your session).

Subscribe() in the onConnect callback is reasonable. But callback is not instinctive for the regular operation, I think.

cleansession approach is right way in the specification. But re-send Subscribe is still needed. And on the paho-go implementation, store subscribed list and resend Subscribe at applicable timing from the Application side is seems difficult.

How about add ReconnectedHandler on the end of the reconnect()? Using this, application can re-send Subscribe after connection re-established.

"And slice management is needed in the Subscribe and Unsubscribe." I am not
clear what you mean here, please can you expand.

Not so important thing. If subscribed list stored inside the client struct, this should be managed inside Subscribe and Unsubscribe like this.

func (c *Client) Subscribe(topic string, qos byte, callback MessageHandler) Token {
(snip)

c.subscribled[string] = qos  // it should be mutexed.

}

On 2015-03-24 06:13:24 -0400, Al Stockdill-Mander wrote:

(In reply to shirou WAKAYAMA from comment # 2)

Subscribe() in the onConnect callback is reasonable. But callback is not
instinctive for the regular operation, I think.

cleansession approach is right way in the specification. But re-send
Subscribe is still needed. And on the paho-go implementation, store
subscribed list and resend Subscribe at applicable timing from the
Application side is seems difficult.

I am concerned about providing function that implements at the client level something that the protocol provides. If we put in auto resubscribe people might be less likely to use cleansession. I am not against the idea of doing this, I want to understand why auto resubscribe is necessary when cleansession exists?

How about add ReconnectedHandler on the end of the reconnect()? Using this,
application can re-send Subscribe after connection re-established.

That seems unnecessary, what would you want to do at the time of reconnect that you wouldn't also do at initial connect? And for that you can use the onConnect callback.

"And slice management is needed in the Subscribe and Unsubscribe." I am not
clear what you mean here, please can you expand.

Not so important thing. If subscribed list stored inside the client struct,
this should be managed inside Subscribe and Unsubscribe like this.

func (c *Client) Subscribe(topic string, qos byte, callback MessageHandler)
Token {
(snip)

c.subscribled[string] = qos  // it should be mutexed.

}

Ahh right, yes, understand.

On 2015-03-24 09:08:18 -0400, shirou WAKAYAMA wrote:

I am concerned about providing function that implements at the client level
something that the protocol provides. If we put in auto resubscribe people
might be less likely to use cleansession. I am not against the idea of doing
this, I want to understand why auto resubscribe is necessary when
cleansession exists?

My understanding about clean session is storing session information on the server side and get messages while disconnected. Auto resubscribe is an client side and stores only "subscribed list" which is used before subscribe. It is implementation level like a error handling, not protocol level, I think.

However,

How about add ReconnectedHandler on the end of the reconnect()? Using this,
application can re-send Subscribe after connection re-established.

That seems unnecessary, what would you want to do at the time of reconnect
that you wouldn't also do at initial connect? And for that you can use the
onConnect callback.

now I understand onConnect callback is also invoked at reconnect(). So, Subscribe() inside onConnect is works for me. As I said, normal process inside a handler is not straightforward and may introduce complexity. But it is acceptable.

Since that, I choose onConnect Handler approach. Thank you for discussing.

On 2015-03-24 09:27:20 -0400, Al Stockdill-Mander wrote:

(In reply to shirou WAKAYAMA from comment # 4)

My understanding about clean session is storing session information on the
server side and get messages while disconnected. Auto resubscribe is an
client side and stores only "subscribed list" which is used before
subscribe. It is implementation level like a error handling, not protocol
level, I think.

Cleansession stores your subscription information so when you are reconnected you are still subscribed to the same topics as before (and as you say it will store messages delivered to your subscriptions while you are disconnected)

However,

How about add ReconnectedHandler on the end of the reconnect()? Using this,
application can re-send Subscribe after connection re-established.

That seems unnecessary, what would you want to do at the time of reconnect
that you wouldn't also do at initial connect? And for that you can use the
onConnect callback.

now I understand onConnect callback is also invoked at reconnect(). So,
Subscribe() inside onConnect is works for me. As I said, normal process
inside a handler is not straightforward and may introduce complexity. But it
is acceptable.

Since that, I choose onConnect Handler approach. Thank you for discussing.

Thank you for your feedback, if you think there are better, easier, ways to do this please let me know. I am used to the idea of callbacks from when I worked on the C client but I am happy to change this if you think something else fits better with Go.

On 2015-03-24 10:13:43 -0400, shirou WAKAYAMA wrote:

now I understand onConnect callback is also invoked at reconnect(). So,
Subscribe() inside onConnect is works for me. As I said, normal process
inside a handler is not straightforward and may introduce complexity. But it
is acceptable.

Since that, I choose onConnect Handler approach. Thank you for discussing.

Thank you for your feedback, if you think there are better, easier, ways to
do this please let me know. I am used to the idea of callbacks from when I
worked on the C client but I am happy to change this if you think something
else fits better with Go.

I am trying to write better, understanding code, but can not yet.
I close this issue and when I found something better, I will open as new issue.

Thank you for replying.

Packet field typo

  • Connect.KeepaliveTimer -> KeepAlive
  • Suback.GrantedQoss -> ReturnCodes

Support a Connect Timeout

migrated from Bugzilla #473551
status RESOLVED severity enhancement in component MQTT-Go for 1.2
Reported in version unspecified on platform All
Assigned to: Al Stockdill-Mander

Original attachment names and IDs:

On 2015-07-24 18:15:40 -0400, Manuel Rabade wrote:

Created attachment 255415
Implmement SetConnectTimeout() on the client options and honor it on the openConnection() method

SetWriteTimeout() option works great to configure the timeout of an established connection but the openConnection() method doesn't have a timeout so if you try to connect to an MQTT broker without network access the timeout depends on your OS (a couple of minutes at least in Linux).

This patch implements the SetConnectTimeout() option you can configure a timeout for the connect phase of the connection. Sorry if it's not the optimal way to do it but I think is better to explain the problem and propose a solution.

Cheers.

On 2015-09-30 03:30:53 -0400, Al Stockdill-Mander wrote:

Thanks for your suggestion, I have made changes to incorporate this function.
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 18fcc52

Accept a net.Conn initing client

migrated from Bugzilla #446895
status NEW severity normal in component MQTT-Go for 1.1
Reported in version future on platform PC
Assigned to: Al Stockdill-Mander

On 2014-10-13 09:22:40 -0400, Al Stockdill-Mander wrote:

Rather than only connecting over a specified set of network transports the client should be able to be initialised with a net.Conn that has already established the connection. The Paho Go library could provide some helper functions that will create connections over known transports.
It needs to be considered how this would interact with the auto reconnection feature.

EOF error if client is already connected..

Hi,

I am using following code to create new client:

func CreateMQTTClient(clientID string) (client MQTT.Client) {
    username := viper.GetString("messaging.rabbitmq.username")
    password := viper.GetString("messaging.rabbitmq.password")
    host := viper.GetString("messaging.rabbitmq.host")
    mqqtPort := viper.GetString("messaging.rabbitmq.mqqtPort")
    rabbitMqMQQTURL := "tcp://" + host + ":" + mqqtPort
    opts := MQTT.NewClientOptions().AddBroker(rabbitMqMQQTURL)
    opts.SetClientID(clientID)
    opts.Username = username
    opts.Password = password
    opts.SetCleanSession(false)
    opts.ProtocolVersion=4
    opts.OnConnect=OnConnectHandler
    opts.AutoReconnect = true
    opts.OnConnectionLost = OnConnectionLostHandler
    cli := MQTT.NewClient(opts)

    log.Println(cli.IsConnected())

    token := cli.Connect()
    token.Wait()
    if token.Error() != nil {
        log.Print(token.Error())
    }


    return cli

}

Since there is no way for me to get existing client and check if client already exists for given client Id. If I call this function again, I am getting EOF exception and OnConnectHandler, OnConnectionLostHandler are being called forever until I kill golang server. It is happening only if AutoReconnect is true.

Incorrect mutex locking order in router.go lines 149 and 151

migrated from Bugzilla #439667
status RESOLVED severity major in component MQTT-Go for 1.1
Reported in version v0.5 on platform PC
Assigned to: Al Stockdill-Mander

Original attachment names and IDs:

On 2014-07-15 20:24:38 -0400, Bill Hathaway wrote:

At line 146, r.RUnlock is called, but if !sent and order==true, then r.RUnlock() is called on line 149 which causes a runtime exception

146 r.RUnlock()
147 if !sent {
148 if order {
149 r.RUnlock()
150 r.defaultHandler(client, *message)
151 r.RLock()

The fix is to switch lines 149 and 151.

Example stack trace
panic: sync: Unlock of unlocked RWMutex

goroutine 24 [running]:
runtime.panic(0x224d60, 0xc208000b80)
/usr/local/go/src/pkg/runtime/panic.c:279 +0xf5
sync.(_RWMutex).Unlock(0xc20805e440)
/usr/local/go/src/pkg/sync/rwmutex.go:114 +0x9b
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·002()
/Users/bhathaway/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/router.go:149 +0x302
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_router).matchAndDispatch
/Users/bhathaway/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/router.go:160 +0xdc

On 2014-07-15 20:44:33 -0400, Bill Hathaway wrote:

Created attachment 245094
git diff

On 2014-08-01 06:22:41 -0400, Al Stockdill-Mander wrote:

Thanks for you fix, this has been merged. We also have Gerrit configured for this project allowing your to push your changes directly for review and merging.
https://wiki.eclipse.org/Development_Resources/Contributing_via_Git#via_Gerrit

client loses connection to broker (on Raspberry Pi 2)

I'm seeing a weird issue with the client, which only happens when running on a Raspberry Pi 2.

ratatosk ➜  ~  go version
go version go1.3.3 linux/arm

I'm using the following code to connect to the broker

url := fmt.Sprintf("tcp://%s:%d", host, port)
    opts := MQTT.NewClientOptions().AddBroker(url)
    opts.SetDefaultPublishHandler(OnMessage)
    opts.SetConnectionLostHandler(ConnectionLost)

    logger.Debug("Connecting to ", url)
    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    logger.Debug("Subscribing to ", topic)
    //subscribe to the topic /go-mqtt/sample and request messages to be delivered
    //at a maximum qos of zero, wait for the receipt to confirm the subscription
    if token := c.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

And when running on my MacBook, it runs just fine, but when running on my RPi2 it will disconnect from the broker every 30-60 seconds, and while it seems to reconnect (per the logs), it never receives anything, it simply just "hangs".

Here's a debug trace

[net]      Received Message
[net]      logic got msg on ibound
[net]      received pubrel, id: 33
[net]      logic waiting for msg on ibound
[net]      obound priority msg to write, type *packets.PubcompPacket
[net]      outgoing waiting for an outbound message
13:58:49.209 UpdateLastUp ▶ DEBUG  Updating last update for  fenrir : 28-0014550
75dff : 2016-02-24 13:58:49.110356 +0000 UTC
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[net]      outgoing stopped
[net]      incoming stopped
13:59:29.146 ConnectionLo ▶ ERROR  Connection Lost: pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[net]      logic started
[net]      logic waiting for msg on ibound
[pinger]   keepalive starting
[pinger]   keepalive sending ping
[net]      incoming started
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      outgoing stopped
[net]      incoming stopped
13:59:49.158 ConnectionLo ▶ ERROR  Connection Lost: pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[pinger]   keepalive starting
[net]      incoming started
[net]      logic started
[net]      logic waiting for msg on ibound
[net]      outgoing stopped
[net]      incoming stopped
13:59:59.158 ConnectionLo ▶ ERROR  Connection Lost: pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[pinger]   keepalive starting
[net]      incoming started
[net]      logic started
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on bound

Token.Wait() crashes

b.MqttClient = MQTT.NewClient(opts)
if token := b.MqttClient.Connect(); token.Wait() && token.Error() != nil { .............

After narrowing it down I found the problem to be with .Wait() function.
Everytime I enter a wrong broker URL with a white space in it(in the middle, or at the end of the string...) the .Wait() function of the client.connect() token crashes the whole program and I can't figure out how to intercept this as the function doesn't allow error handling.

token.Error() is nil
b.MqttClient.Connect() works fine, this is the token I get
&{baseToken:{m:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} complete:0xc4200633e0 ready:false err:} returnCode:0}

This is the panicking error I get
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x581117]

Desire to know if a message is queued

If connection is down, I would like to know so i can back-off sending new messages. Checking IsConnected returns true as long as client is attempting a reconnection.

ping.go should wait a resonable amout of time for the broker response

migrated from Bugzilla #486528
status RESOLVED severity normal in component MQTT-Go for 1.2
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

Original attachment names and IDs:

On 2016-01-26 03:04:27 -0500, Manuel Rabade wrote:

Created attachment 259373
KeepAliveTimeout option

In ping.go the PINGREQ is sent according to the KeepAlive option but the routine expects a PINGRESP from the broker in just a second. The client should wait a resonable amount of time for the broker PINGRESP. In slow networks this bug provokes a lot of client disconnects from the broker.

I propose a KeepAliveTimeout option to configure a second timer in ping.go to wait for the broker answer.

On 2016-01-28 04:10:02 -0500, Al Stockdill-Mander wrote:

Looking at the ping timer and handling section I'm thinking I should probably move to using time.Timers rather than the 1 second sleep loop currently in place, in addition I agree that there should be a configurable timeout for pingresp handling.

On 2016-02-08 04:45:28 -0500, Al Stockdill-Mander wrote:

I've changed the ping code to use a proper time.Timer and added an option to allow you to set the amount of time to wait for a ping response, with a default of 10 seconds.
Tests all pass, please let me know if this resolves your issue.

Wrong documentation for NewClientOptions default values

migrated from Bugzilla #472777
status RESOLVED severity minor in component MQTT-Go for 1.2
Reported in version unspecified on platform All
Assigned to: Al Stockdill-Mander

On 2015-07-15 18:18:06 -0400, Manuel Rabade wrote:

The documentation for NewClientOptions default values says:

Port: 1883
CleanSession: True
Timeout: 30 (seconds)
Tracefile: os.Stdout

But according to options.go (branch master) I think it should say:

CleanSession: True
Order: True
KeepAlive: 30 (seconds)
MaxReconnectInterval: 10 (minutes)
AutoReconnect: True

On 2015-09-30 03:41:08 -0400, Al Stockdill-Mander wrote:

Thanks for reporting this, updated
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 28e323f

Multi-level wildcard does not match parent

migrated from Bugzilla #487236
status CLOSED severity major in component MQTT-Go for 1.2
Reported in version 1.2 on platform All
Assigned to: Al Stockdill-Mander

On 2016-02-04 09:26:55 -0500, Brian Faull wrote:

The multi-level wildcard does not seem to match the parent topic.

E.g., with a subscription to topic
some/topic/#

Paho does not seem to receive messages posted to
some/topic
... but it should.

though it does (and should!) receive messages to all of the following:
some/topic/
some/topic/one
some/topic/one/two/three

Spec current (v3.1.1) section 4.7.1.2 "Multi-level wildcard" states the intended behavior:
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107

Apparently offending code:
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/topic.go#n49

I see similar behavior in the MQTT.fx, which I believe uses the Paho-Java, so this could be a problem in other language-packages also.

I believe I am using the current version of Paho-Go. I did not see a previous bug on this. I believe (but can't entirely rule out) that this is Paho behavior and not bad behavior of the MQTT broker I'm using. If I have a chance I shall submit a bug-fix/PR.

On 2016-02-04 09:56:35 -0500, Brian Faull wrote:

Upon further investigation it seems that this is probably "my problem" -- a problem with the broker endpoint and not with Paho-Go. I have filed a query with that provider and will delete this bug if I hear back with confirmation.

I apologize for the premature filing, and thank you for a great library!

On 2016-02-05 11:01:47 -0500, Brian Faull wrote:

(In reply to Brian Faull from comment # 1)

Upon further investigation it seems that this is probably "my problem" -- a
problem with the broker endpoint and not with Paho-Go. I have filed a
query with that provider and will delete this bug if I hear back with
confirmation.

I apologize for the premature filing, and thank you for a great library!

This is my problem, not the Paho library problem. I apologize. This bug can be deleted.

Race condition

migrated from Bugzilla #448001
status RESOLVED severity major in component MQTT-Go for 1.1
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2014-10-21 01:19:52 -0400, Cem Ezberci wrote:

It seems like there is race condition in the package. I ran the simple.go example with -race flag and the following was the result. This results in random panics when multiple client connections are created from the same program:

(trusty)ezbercih@localhost:~/code/go/src/mqtt$ go run -race simple.go

WARNING: DATA RACE
Write by goroutine 6:
sync/atomic.CompareAndSwapInt32()
/home/ezbercih/go/src/runtime/race_amd64.s:282 +0xc
sync.(*Mutex).Lock()
/home/ezbercih/go/src/sync/mutex.go:43 +0x50
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·001()
/home/ezbercih/code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:41 +0x5a

Previous read by main goroutine:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start()
/home/ezbercih/code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:124 +0xc4e
main.main()
/home/ezbercih/code/go/src/mqtt/simple.go:35 +0xfd

Goroutine 6 (running) created at:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_messageIds).generateMsgIds()
/home/ezbercih/code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:51 +0xea
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(_MqttClient).Start()
/home/ezbercih/code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:113 +0xadc
main.main()

/home/ezbercih/code/go/src/mqtt/simple.go:35 +0xfd

TOPIC: /go-mqtt/sample
MSG: this is msg # 0!
TOPIC: /go-mqtt/sample
MSG: this is msg # 1!
TOPIC: /go-mqtt/sample
MSG: this is msg # 2!
TOPIC: /go-mqtt/sample
MSG: this is msg # 3!
TOPIC: /go-mqtt/sample
MSG: this is msg # 4!
Found 1 data race(s)
exit status 66

On 2014-10-21 12:04:55 -0400, Al Stockdill-Mander wrote:

Thanks for reporting this.
The version in the develop branch does not seem to suffer from the data race, I changed how the message ids worked. I don't plan to fix issues in the master branch now and am looking to move the develop branch across in the next month.

nil pointer exception on Client.Connect()

migrated from Bugzilla #463540
status RESOLVED severity minor in component MQTT-Go for 1.1
Reported in version 0.9 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-03-31 01:27:28 -0400, shirou WAKAYAMA wrote:

This is an nitpick problem. But still occured on my environment. (Sorry, I can not reproduce by mosquitto)

In the Client.Connect(),

if

then, c.conn == nil and err == nil.

so, nil pointer exception occurred at err.Error().
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/client.go#n183

How about to use fmt.Errorf() instead of errors.New()?
like,
t.err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)

Errorf fill just empty if err is nil.

Thank you.

On 2015-04-08 10:42:31 -0400, Al Stockdill-Mander wrote:

http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 80999ff

On 2015-04-08 10:42:48 -0400, Al Stockdill-Mander wrote:

Thanks again for reporting the bugs you're seeing.

Subscribing to a topic with a trailing slash doesn't match messages without one

migrated from Bugzilla #444939
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2014-09-24 07:47:25 -0400, Mark Wolfe wrote:

We found an issue where if you specify

$test/123/

The paho client accepts it and doesn't error or warn, then fails to match topics without the slash.

I am interested to hear if this is consistant with other clients as mosquitto DOES match with and without the slash.

Cheers

On 2014-09-24 10:34:27 -0400, Al Stockdill-Mander wrote:

MQTT v3.1 didn't specify hard behaviour for this situation, whereas MQTT v3.1.1 is much more definite; http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/cos01/mqtt-v3.1.1-cos01.pdf Section 4.7.3 - Topic Semantic and Usage
"A leading or trailing �/� creates a distinct Topic Name or Topic Filter"

So subscribing to $test/123 is separate and distinct from $test/123/ and both are valid.
If internally the broker treats the two the same and sends the client messages from $test/123 when it subscribed to $test/123/ the internal subscription matching of the Go client won't find a matching callback, and at best you'll get the DefaultMessageHandler.

Other clients that don't allow subscription specific message handlers will not see this problem. Given the language in the 3.1.1 standard I do not intend to change this behaviour.

On 2014-09-24 12:26:17 -0400, Roger Light wrote:

FWIW, this behaviour should be fixed in mosquitto 1.4. I made a good few MQTT V3.1.1 compliance fixes a while back.

packets.(*PingreqPacket).Write(0xc8200f3b60, 0x0, 0x0, 0x0, 0x0), invalid memory address or nil pointer dereference

panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x20 pc=0x561e72]
goroutine 38 [running]:
bytes.(*Buffer).WriteTo(0xc82048adf8, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/bytes/buffer.go:206 +0xc2
org.eclipse.paho.mqtt.golang.git/packets.(*PingreqPacket).Write(0xc8200f3b60, 0x0, 0x0, 0x0, 0x0)
org.eclipse.paho.mqtt.golang.git/packets/pingreq.go:23 +0x83
org%2eeclipse%2epaho%2emqtt%2egolang%2egit.keepalive(0xc8201dc300)
org.eclipse.paho.mqtt.golang.git/ping.go:60 +0x77a
created by org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*Client).Connect.func1
org.eclipse.paho.mqtt.golang.git/client.go:212 +0x13a1

using it in my project, after running a few days(maybe 10, 0000 times), packets.(*PingreqPacket).Write(0xc8200f3b60, 0x0, 0x0, 0x0, 0x0) will write fail case the err above "invalid memory address or nil pointer dereference"

i use the packets like this :
func main() {
client := MQTT.NewClient(opts)
go publishMessage(
client.Publish() //will be exec every 3 seconds
)
client.Subscribe()
}

Intermittent test failures

I'm getting some intermittent test failures in fvt_client_test.go, specifically in Test_Will. Sometimes it passes, but occasionally it will either:

  • reach the end of the test, but connections stay open so test doesn't end
  • panic on double-closing the Client.stop channel (in internalConnLost)

I've also seen the latter in Test_Binary_Will.

The logs for the former look like:

=== RUN   Test_Will                                                                                                                                                                                                                [115/22818]
WARNING  17:50:18 [net]      logic stopped
DEBUG    17:50:18 [client]   Connect()
DEBUG    17:50:18 [client]   about to write new connect msg
DEBUG    17:50:18 [client]   socket connected to broker
DEBUG    17:50:18 [client]   Using MQTT 3.1.1 protocol
DEBUG    17:50:18 [net]      connect started
DEBUG    17:50:18 [net]      received connack
DEBUG    17:50:18 [store]    memorystore initialized
DEBUG    17:50:18 [client]   client is connected
WARNING  17:50:18 [store]    memorystore wiped
DEBUG    17:50:18 [client]   exit startClient
DEBUG    17:50:18 [client]   enter Subscribe
DEBUG    17:50:18 [net]      outgoing started
DEBUG    17:50:18 [net]      logic started
DEBUG    17:50:18 [pinger]   keepalive starting
DEBUG    17:50:18 [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0MessageID: 0 topics: [/wills]
DEBUG    17:50:18 [client]   exit Subscribe
DEBUG    17:50:18 [net]      incoming started
DEBUG    17:50:18 [client]   Connect()
DEBUG    17:50:18 [client]   about to write new connect msg
DEBUG    17:50:18 [net]      outgoing waiting for an outbound message
DEBUG    17:50:18 [net]      obound priority msg to write, type *packets.SubscribePacket
DEBUG    17:50:18 [net]      outgoing waiting for an outbound message
DEBUG    17:50:18 [net]      logic waiting for msg on ibound
DEBUG    17:50:19 [net]      Received Message
DEBUG    17:50:19 [net]      logic got msg on ibound
DEBUG    17:50:19 [net]      received suback, id: 1
DEBUG    17:50:19 [net]      granted qoss [0]
DEBUG    17:50:19 [net]      logic waiting for msg on ibound
DEBUG    17:50:19 [client]   socket connected to broker
DEBUG    17:50:19 [client]   Using MQTT 3.1.1 protocol
DEBUG    17:50:19 [net]      connect started
DEBUG    17:50:19 [net]      received connack
DEBUG    17:50:19 [store]    memorystore initialized
DEBUG    17:50:19 [client]   client is connected
WARNING  17:50:19 [store]    memorystore wiped
DEBUG    17:50:19 [client]   exit startClient
DEBUG    17:50:19 [net]      outgoing started
DEBUG    17:50:19 [net]      outgoing waiting for an outbound message
DEBUG    17:50:19 [pinger]   keepalive starting
DEBUG    17:50:19 [net]      incoming started
DEBUG    17:50:19 [net]      logic started
DEBUG    17:50:19 [net]      logic waiting for msg on ibound
DEBUG    17:50:20 [client]   forcefully disconnecting
ERROR    17:50:20 [net]      incoming stopped with error
WARNING  17:50:20 [net]      logic stopped
DEBUG    17:50:20 [net]      outgoing stopped
DEBUG    17:50:20 [pinger]   keepalive stopped
DEBUG    17:50:20 [net]      Received Message
DEBUG    17:50:20 [net]      logic got msg on ibound
DEBUG    17:50:20 [net]      received publish, msgId: 0
DEBUG    17:50:20 [net]      putting msg on onPubChan
DEBUG    17:50:20 [net]      done putting msg on incomingPubChan
DEBUG    17:50:20 [net]      logic waiting for msg on ibound
TOPIC: /wills
MSG: good-byte!                           # produced when `wsub` receives the will at fvt_client_test.go:202
DEBUG    17:50:48 [pinger]   keepalive sending ping
DEBUG    17:50:49 [net]      Received Message
DEBUG    17:50:49 [net]      logic got msg on ibound
DEBUG    17:50:49 [net]      received pingresp
DEBUG    17:50:49 [net]      logic waiting for msg on ibound
DEBUG    17:50:59 [pinger]   keepalive sending ping
DEBUG    17:50:59 [net]      Received Message
DEBUG    17:50:59 [net]      logic got msg on ibound
DEBUG    17:50:59 [net]      received pingresp
DEBUG    17:50:59 [net]      logic waiting for msg on ibound
...

Is there any extra setup I should be doing before running tests, or is this a genuine bug?

keepalive does not send PINGREQ after reconnect

migrated from Bugzilla #475756
status RESOLVED severity normal in component MQTT-Go for 1.2
Reported in version 1.1 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-08-25 01:42:10 -0400, shirou WAKAYAMA wrote:

When client cloud not receive PINGRESP once, client is disconnected.
And after connect, the keepalive does not send PINGREQ, so disconnected very soon after reconnected.

Here is the debug log.

DEBUG 12:25:07 [pinger] keepalive sending ping
CRITICAL 12:25:08 [pinger] pingresp not received, disconnecting
DEBUG 12:25:08 [net] outgoing stopped
WARNING 12:25:08 [net] logic stopped
DEBUG 12:25:08 [net] incoming stopped
DEBUG 12:25:08 [client] enter reconnect
DEBUG 12:25:08 [client] about to write new connect msg
DEBUG 12:25:09 [client] socket connected to broker
DEBUG 12:25:09 [client] Using MQTT 3.1.1 protocol
DEBUG 12:25:09 [net] connect started
DEBUG 12:25:09 [net] received connack
DEBUG 12:25:09 [client] client is reconnected
DEBUG 12:25:09 [net] incoming started
DEBUG 12:25:09 [net] logic started
DEBUG 12:25:09 [net] logic waiting for msg on ibound
DEBUG 12:25:09 [pinger] keepalive starting
DEBUG 12:25:09 [net] outgoing started
DEBUG 12:25:09 [net] outgoing waiting for an outbound message

<----- keepalive does not sending ping until here ------->

CRITICAL 12:25:20 [pinger] pingresp not received, disconnecting
WARNING 12:25:20 [net] logic stopped

Because even after reconnected and keepalive started, c.pingOutstanding is still true. So always fail, I think.
This patch can fix on my environment.

diff --git a/ping.go b/ping.go
index f8e0e5d..1ccd1ec 100644
--- a/ping.go
+++ b/ping.go
@@ -41,6 +41,7 @@ func (l *lastcontact) get() time.Time {

func keepalive(c *Client) {
DEBUG.Println(PNG, "keepalive starting")

  •   c.pingOutstanding = false
    
    for {
            select {
    

On 2015-09-18 07:23:06 -0400, Fabian Ruff wrote:

I just run into the same issue.
Your fix worked for me as well and it looks sound. Good catch!

Can we please incorporate this fix in master. Looks like the reconnect handling is seriously broken.

Kind regards,
Fabian

On 2015-09-30 03:16:22 -0400, Al Stockdill-Mander wrote:

Sorry for the delay in handling this bug, thanks for reporting it.
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 0c77c5b

Crash when using SetCleanSession(false)

migrated from Bugzilla #483372
status UNCONFIRMED severity normal in component MQTT-Go for 1.2
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2015-12-01 06:34:24 -0500, sam Nay wrote:

I get the following trace when I try to connect with SetCleanSession(false). Things work fine with SetCleanSession(true).

panic: Network Error : %!s()

goroutine 16 [running]:
runtime.panic(0x28b820, 0xc208000d40)
/usr/local/go/src/pkg/runtime/panic.c:279 +0xf5
main.main()
/Users/Foo/Work/Go/src/github.com/bar/rabbitmq/tutorial/mqtt_receiver.go:24 +0x24b

goroutine 19 [finalizer wait]:
runtime.park(0x14e70, 0x4910d0, 0x48fbc9)
/usr/local/go/src/pkg/runtime/proc.c:1369 +0x89
runtime.parkunlock(0x4910d0, 0x48fbc9)
/usr/local/go/src/pkg/runtime/proc.c:1385 +0x3b
runfinq()
/usr/local/go/src/pkg/runtime/mgc0.c:2644 +0xcf
runtime.goexit()
/usr/local/go/src/pkg/runtime/proc.c:1445

goroutine 17 [syscall]:
runtime.goexit()
/usr/local/go/src/pkg/runtime/proc.c:1445
exit status 2

My code is:

func main() {
opts := mqtt.NewClientOptions().AddBroker("tcp://localhost:1883/")
opts.SetClientID("clientB")
opts.SetDefaultPublishHandler(mqttFunc)
opts.SetCleanSession(false)

c := mqtt.NewClient(opts)
token := c.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error()) // *** Crashes here ***
}

...

On 2015-12-01 06:44:00 -0500, sam Nay wrote:

I just tried creating a connection with a new ClientId and things work. Seems like I cannot connect with cleanSession=false, when using the same ClientId that has previously created a connection with cleanSession=true. Was not aware that there was a limitation like this. Even if there was, I assume there should be a better error message?

Seems like the connection needs to be properly disconnected before you can reconnect with a different cleanSession value.

On 2015-12-01 08:01:03 -0500, Al Stockdill-Mander wrote:

That's not behaviour I am familiar with and sounds like a configuration issue with the mqtt server you're using. What are you using as your mqtt server?

WebSocket payload type should be set to binary

migrated from Bugzilla #429306
status RESOLVED severity normal in component MQTT-Go for ---
Reported in version unspecified on platform All
Assigned to: Al Stockdill-Mander

Original attachment names and IDs:

On 2014-02-28 05:55:03 -0500, Ilja Dmitricenko wrote:

Created attachment 240394
patch

On 2014-03-04 10:46:17 -0500, Al Stockdill-Mander wrote:

Thanks for the patch, please sign the CLA and I'll merge it, alternatively we are using Gerrit to frontend the git repos so you can commit a patch using that and I can merge it in that way.

On 2014-07-03 06:35:41 -0400, Al Stockdill-Mander wrote:

Resolved in http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: fc7cd89

The current Go client does not support MQTT 3.1.1

migrated from Bugzilla #439422
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version v0.5 on platform PC
Assigned to: Al Stockdill-Mander

On 2014-07-11 06:47:12 -0400, Al Stockdill-Mander wrote:

The Java/C/Javascript/Python clients all support MQTT 3.1.1, the standard is currently in it's final 60 day review, server implementations are now supporting 3.1.1 as well so it's important that the Go client operates with these servers.

On 2015-03-23 14:39:36 -0400, Al Stockdill-Mander wrote:

It does now support the 3.1.1 features

Store stuff doesn't actually work

Looks like the code to persist pending outbound/inbound messages was commented out at some point. It would be bring this feature back, or remove it entirely if it's not going to be supported in a future release. At the moment it's quite confusing that I'm calling opts.SetStore(mqtt.NewFileStore("./some_directory")) but not getting anything stored there!

Connection recovery related conversation

migrated from Bugzilla #444937
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

Original attachment names and IDs:

On 2014-09-24 07:43:43 -0400, Mark Wolfe wrote:

Currently the paho client needs to connect at least once, to trigger the reconnect logic.

In our case this means we need to:

  • perform a lookup of the host, our software runs on hardware in the home, which can be offline for short periods of time.
    • if this fails go backoff for a while
  • if this is OK then call Start
  • assign a reconnect handler

In doing so we have found a couple of issues.

Initially we used OnConnectionLost to call disconnect then start on the connection but this sometimes causes a panic. We then moved to using it to allocate a new Client instance and on reconnect, this unfortunately leads to a memory leak.

So this is really a general question on the intent of the reconnect routine, and the start function. It may be good to hear what other libs do in these cases. As this library is quite large it would be preferable to not reallocate it ever, so I would prefer to be informed of a reconnect, rather than needing to do something about it.

I can provide a gist to trigger the leak if required, and indeed verify it is still an issue.

On 2014-09-24 11:08:31 -0400, Al Stockdill-Mander wrote:

I'd be interested to know more about the panics that occur when you disconnect then Start() the client from the connectionlost handler. This should be the correct way to handle this situation and is how the other clients tend to operate. Going forward I intend to provide autoreconnect logic in the client.

On 2014-10-04 23:26:48 -0400, Mark Wolfe wrote:

Just working on some tests around this, first issue is with a simple connection timeout.

Test program makes a connection to test.mosquitto.org then I disable wifi on my laptop.

OnLost function is as follows.

func onLost(client *mqtt.MqttClient, reason error) {
log.Println(spew.Sprintf("connection lost %v", reason))

// in a loop try and reconnect to the broker
for {
_, err := client.Start()

  if err != nil {
      log.Printf("unable to connect : %s", err)
  } else {
      break
  }

}
}

Full example is attached.

Resulting panic is:

2014/10/05 14:22:19 main.go:63: connection lost <>pingresp not received, disconnecting
2014/10/05 14:22:19 main.go:69: unable to connect : Unknown RC
2014/10/05 14:22:19 main.go:63: connection lost <
>use of closed network connection
panic: runtime error: close of closed channel

goroutine 38 [running]:
runtime.panic(0x2e4e40, 0x4c9315)
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/panic.c:279 +0xf5
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.alllogic(0xc20805e000)
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/net.go:313 +0x2506
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:122 +0x8e8

goroutine 16 [chan receive]:
main.main()
/Users/markw/Code/go/src/github.com/wolfeidau/mqtt-testing/main.go:50 +0x543

goroutine 19 [finalizer wait, 2 minutes]:
runtime.park(0x150d0, 0x4cdb38, 0x4cc5e9)
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/proc.c:1369 +0x89
runtime.parkunlock(0x4cdb38, 0x4cc5e9)
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/proc.c:1385 +0x3b
runfinq()
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/mgc0.c:2644 +0xcf
runtime.goexit()
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/proc.c:1445

goroutine 20 [syscall, 2 minutes]:
os/signal.loop()
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/os/signal/signal_unix.go:21 +0x1e
created by os/signal.init·1
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/os/signal/signal_unix.go:27 +0x32

goroutine 17 [syscall, 2 minutes]:
runtime.goexit()
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/proc.c:1445

goroutine 21 [chan send]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·001()
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:46 +0x11d
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*messageIds).generateMsgIds
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:51 +0x8b

goroutine 35 [runnable]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start(0xc20805e000, 0x0, 0x0, 0x0, 0x0, 0x0)
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:126 +0xa6e
main.onLost(0xc20805e000, 0x57f1b0, 0xc208000370)
/Users/markw/Code/go/src/github.com/wolfeidau/mqtt-testing/main.go:66 +0x17a
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.alllogic
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/net.go:318 +0x2590

goroutine 36 [chan send]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·001()
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:46 +0x11d
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*messageIds).generateMsgIds
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:51 +0x8b

goroutine 25 [sleep]:
time.Sleep(0x3b9aca00)
/usr/local/Cellar/go/1.3.2/libexec/src/pkg/runtime/time.goc:39 +0x31
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.keepalive(0xc20805e000)
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/ping.go:70 +0x4d1
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:147 +0xf9b

goroutine 31 [chan send]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·001()
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:46 +0x11d
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*messageIds).generateMsgIds
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:51 +0x8b

goroutine 30 [chan send]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start(0xc20805e000, 0x0, 0x0, 0x0, 0x0, 0x0)
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:126 +0xa6e
main.onLost(0xc20805e000, 0x57f1b0, 0xc2080008f0)
/Users/markw/Code/go/src/github.com/wolfeidau/mqtt-testing/main.go:66 +0x17a
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.keepalive
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/ping.go:66 +0x622

goroutine 39 [runnable]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.func·001()
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:39
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*messageIds).generateMsgIds
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/messageids.go:51 +0x8b

goroutine 40 [runnable]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.outgoing(0xc20805e000)
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/net.go:138
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:121 +0x8cd

goroutine 41 [runnable]:
git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.alllogic(0xc20805e000)
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/net.go:212
created by git.eclipse.org/gitroot/paho/org%2eeclipse%2epaho%2emqtt%2egolang%2egit.(*MqttClient).Start
/Users/markw/Code/go/src/git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/client.go:122 +0x8e8
exit status 2

On 2014-10-04 23:28:36 -0400, Mark Wolfe wrote:

Created attachment 247624
Example program which attempts reconnect

Missed attachment, sorry.

On 2014-10-13 08:55:39 -0400, Al Stockdill-Mander wrote:

I've just delivered an update in the develop branch that provides automatic reconnect functionality, I'd be interested for feedback on the design and implementation.

On 2014-11-05 16:57:48 -0500, Mark Wolfe wrote:

(In reply to Al Stockdill-Mander from comment # 4)

I've just delivered an update in the develop branch that provides automatic
reconnect functionality, I'd be interested for feedback on the design and
implementation.

I have tested this a bit, initially that branch didn't compile but i got it running.

Seems to still be having issues with reconnects.. mainly around closed channels.

Did you miss pushing some changes maybe?

On 2015-03-23 14:39:05 -0400, Al Stockdill-Mander wrote:

I have pushed the changes that were in develop into master as part of preparations for a major release. This has breaking API changes but should have a working automatic reconnect feature.

On 2015-10-05 06:26:17 -0400, Al Stockdill-Mander wrote:

Closing as resolved based on delivery on function

Example is broken

Hi,

the example at https://eclipse.org/paho/clients/golang/ is broken.

The following error is returned during compilation:

go-projects/src/mqttTest/main.go:15: cannot use func literal (type func(*mqtt.Client, mqtt.Message)) as type mqtt.MessageHandler in assignment

This is easily reproducible - just copy & paste the example code into a new project and try compiling it.

I'm using go 1.6.2 linux/arm

Disconnect hangs if a message is received during disconnect

Symptom: Disconnect never returns

Debug logging indicates that there's a possible race in the disconnect code causing incoming goroutine to block on a write to c.ibound channel. As a result the disconnect waits forever on the workers WaitGroup.

Here's the debug log for the disconnect illustrating the issue:

[client]   disconnecting
[net]      obound priority msg to write, type *packets.DisconnectPacket
[net]      outbound wrote disconnect, stopping
[net]      logic stopped
[net]      Received Message
[pinger]   keepalive stopped

compare this to a normal disconnect sequence:

[client]   disconnecting
[net]      obound priority msg to write, type *packets.DisconnectPacket
[net]      outbound wrote disconnect, stopping
[net]      incoming stopped
[net]      logic stopped
[pinger]   keepalive stopped
[client]   disconnected
[store]    memorystore closed

incoming stopped with error EOF, when two publisher publish message at the same time.

I use mosquitto as mqtt broker, when I use two publisher publish message to one subscriber at the same time, it can't work, show:

[client]   sending publish message, topic: /006e77986952ffe946d4db0824b28057/sewise/device_name/device_1
[pinger]   keepalive starting
[net]      outgoing waiting for an outbound message
[net]      incoming started
[net]      obound wrote msg, id: 1
[net]      outgoing waiting for an outbound message
[net]      resetting ping timers
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pubrec, id: 1
[net]      logic waiting for msg on ibound
[net]      obound priority msg to write, type *packets.PubrelPacket
[net]      outgoing waiting for an outbound message
[net]      resetting ping timers
[net]      Received Message
[net]      logic got msg on ibound
[store]    memorystore del: message 1 was deleted
[net]      received pubcomp, id: 1
[net]      incoming stopped with error EOF
[net]      logic waiting for msg on ibound
[net]      logic received from error channel, other components have errored, stopping
[net]      outgoing stopped
[pinger]   keepalive stopped
Connection lost: EOF
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      incoming started
[pinger]   keepalive starting
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[net]      logic started
[net]      logic waiting for msg on ibound
[net]      incoming stopped with error EOF
[net]      logic received from error channel, other components have errored, stopping
[pinger]   keepalive stopped
[net]      outgoing stopped
Connection lost: EOF
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker

my code:

package main

import (
	"flag"
	"fmt"
	"math/rand"
	"os/signal"
	"syscall"
	"time"

	"log"
	"os"
	"strconv"

	"githubcom/golang/glog"

	MQTT "github.com/eclipse/paho.mqtt.golang"
)

var (
	companyID    = flag.String("cmid", "company_id_1", "company id")
	companyName  = flag.String("cmname", "sewise", "company name")
	device_id    = flag.String("dvid", "device_1", "devices group")
	device_name  = flag.String("dvname", "device_name", "devices group")
	broker       = flag.String("broker", "tcp://127.0.0.1:1883", "The broker URI. ex: tcp://10.10.1.1:1883")
	password     = flag.String("p", "", "The password (optional)")
	user         = flag.String("u", "", "The User (optional)")
	cleansess    = flag.Bool("clean", false, "Set Clean Session (default false)")
	qos          = flag.Int("q", 1, "The Quality of Service 0,1,2 (default 1)")
	num          = flag.Int("num", 1, "The number of messages to publish or subscribe (default 1)")
	payload      = flag.String("m", "open the led", "The message text to publish (default empty)")
	messageCount = flag.Uint("mc", 0, "message count")
	random       = flag.Bool("rand", false, "enable random")
	idmax        = flag.Int("idmax", 1, "random max number")
	sleep        = flag.Int("sleep", 10, "sleep time")
)

func main() {
	MQTT.DEBUG = log.New(os.Stdout, "", 0)
	MQTT.ERROR = log.New(os.Stdout, "", 0)

	flag.Parse()

	// optional
	opts := MQTT.NewClientOptions()
	opts.AddBroker(*broker)
	opts.SetClientID("test_pub")
	opts.SetUsername(*user)
	opts.SetPassword(*password)
	opts.SetCleanSession(*cleansess)
	glog.Infoln("Publisher Started")

	client := MQTT.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}
	var dvid string
	t0 := time.Now()
	for i := 1; i <= *num; i++ {
		// log.Println("num:", i)
		if *random {
			rand.Seed(time.Now().UnixNano())
			id := rand.Intn(*idmax) + 1
			dvid = "device_" + strconv.Itoa(id)
		} else {
			dvid = *device_id
		}

		topic := fmt.Sprintf("/%s/%s/%s/%s", *companyID, *companyName, *device_name, dvid)
		err := publish(client, *qos, topic, *payload)

		if err != nil {
			glog.Errorln("publish error: ", err)
		}
		s := time.Duration(*sleep)
		time.Sleep(time.Millisecond * s)
	}
	t := time.Since(t0)
	// client.Disconnect(50)
	log.Println("Publisher Done, total time:", t)
	ch := make(chan os.Signal)
	signal.Notify(ch, syscall.SIGINT)
	sig := <-ch
	fmt.Printf("Signal received: %v\n", sig)
}

func publish(client MQTT.Client, qos int, top, payload string) error {
	(*messageCount)++
	p := fmt.Sprintf("[%d]_%s", *messageCount, payload)
	token := client.Publish(top, byte(qos), false, p)
	if token.Error() != nil {
		return token.Error()
	}
	token.WaitTimeout(time.Millisecond * 100)
	glog.Errorf("count: %d, [OnSend] topic=>%s message=>%s", *messageCount, top, p)
	return nil
}

os.Exit(1) in the DefaultErrorHandler

migrated from Bugzilla #462516
status RESOLVED severity normal in component MQTT-Go for 1.1
Reported in version 1.2 on platform PC
Assigned to: Al Stockdill-Mander

On 2015-03-18 21:15:37 -0400, shirou WAKAYAMA wrote:

The DefaultErrorHandler invokes os.Exit(1).
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/tree/oops.go#n47
So if some error happens, not only MQTT client/connection but also process it self going down.

I know this can be overwrite by using SetOnConnectionLost(). But this behavior is a little aggressive for Default Handler of library.

Could you just delete os.Exit(1) ?

Thanks!

On 2015-03-23 14:37:36 -0400, Al Stockdill-Mander wrote:

Thanks for that, I have fixed this as part of merging the develop branch into the master. Please note that v0.9.0 has breaking API changes compared to the previous version in master

Keepalive processing needs two timers

migrated from Bugzilla #485186
status ASSIGNED severity normal in component MQTT-Go for 1.2
Reported in version unspecified on platform All
Assigned to: Al Stockdill-Mander

On 2016-01-04 21:58:22 -0500, Che-Wei Kuo wrote:

(When I typed the keyword "Keepalive" in the summary field, I found several same topics.)
https://bugs.eclipse.org/bugs/show_bug.cgi?id=442683 MQTT-C
https://bugs.eclipse.org/bugs/show_bug.cgi?id=442691 MQTT-Java
https://bugs.eclipse.org/bugs/show_bug.cgi?id=442692 MQTT-JS

In MQTT-Go, lastContact (in ping.go) is used for both ibound and obound messages (in net.go).

QoS=0 messages should not be delivered on AutoReconnect

migrated from Bugzilla #486959
status RESOLVED severity major in component MQTT-Go for 1.2
Reported in version unspecified on platform PC
Assigned to: Al Stockdill-Mander

On 2016-02-01 13:58:51 -0500, Mark Mindenhall wrote:

When I create a client with SetAutoReconnect(true) in the ClientOptions, the client behaves as if the connection is never lost. When the broker is actually disconnected, c.IsConnected() always returns true, while c.reconnect() tries to reconnect in a separate goroutine.

This means that when the client is used to Publish messages, they are written to the c.obound channel (which buffers up to 100 messages) regardless of QoS, to be delivered if/when the connection is reestablished. However, per the spec (v3.1.1, section 4.3.1), QoS=0 behavior should be as follows: "No response is sent by the receiver and no retry is performed by the sender."

The AutoReconnect feature is out of compliance here, because delivering these on reconnect is most definitely a "retry". Furthermore, not dropping these messages allows QoS=0 messages to possibly prevent messages with higher QoS from being delivered. This is because the internal c.obound channel buffer length is set to 100, which means only the most recent 100 messages will be buffered to publish on reconnect. QoS=0 messages should never prevent QoS=1 or QoS=2 from being delivered in this way.

My current project provides a concrete example of this problem. We publish a heartbeat message with QoS=0 at a configurable interval (e.g., every 10 seconds), and various other mission-critical messages at QoS=1. If we lose connection to the broker, there's no value to our application in delivering the delayed heartbeat messages, and these would eventually prevent more important messages from being delivered.

Marked as "major" due to this feature being out of conformance with the spec.

On 2016-02-08 06:24:48 -0500, Al Stockdill-Mander wrote:

This issue should be resolved by the changes in these commits
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 5687bf8
http://git.eclipse.org/c/paho/org.eclipse.paho.mqtt.golang.git/commit/?id=SHA: 617c801

The second also allows you to set the size of buffers on the internal channels but only allows it when using autoreconnect. Please let me know if this change doesn't work as expected.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.