Giter Site home page Giter Site logo

wind-c / comqtt Goto Github PK

View Code? Open in Web Editor NEW
829.0 12.0 48.0 706 KB

A lightweight, high-performance go mqtt server(v3.0|v3.1.1|v5.0) supporting distributed cluster

License: MIT License

Dockerfile 0.03% Go 99.92% Procfile 0.02% Makefile 0.03%
golang mqtt iot cluster distributed golang-library mqtt-broker mqtt-server mqtt-smarthome raft

comqtt's Introduction

Build Status contributions welcome codecov GoDoc

Comqtt

A lightweight, high-performance MQTT server in Go (v3.0|v3.1.1|v5.0)

Comqtt is an embeddable high-performance MQTT broker server written in Go, and supporting distributed cluster, and compliant with the MQTT v3.0 and v3.1.1 and v5.0 specification for the development of IoT and smarthome projects. The server can be used either as a standalone binary or embedded as a library in your own projects. Comqtt message throughput is comparable with everyone's favourites such as Mosquitto, Mosca, and VerneMQ.

👍 Comqtt code is cleaner, easier to read, customize, and extend than other Mqtt Broker code! 😍

👍 If you like this project or it's useful to you, please give it a STAR, let more people know about it, and contribute in it's maintenance together! 💪

📦 💬 See Github Discussions for discussions about releases

Ongoing discussion about current and future releases can be found at https://github.com/wind-c/comqtt/discussions

Developers in China can join wechat group discussions at #32

What is MQTT?

MQTT stands for MQ Telemetry Transport. It is a publish/subscribe, extremely simple and lightweight messaging protocol, designed for constrained devices and low-bandwidth, high-latency or unreliable networks. Learn more

When is this repo updated?

Unless it's a critical issue, new releases typically go out over the weekend. At some point in the future this repo may be converted to an organisation, or collaborators added if the project continues to grow.

Comqtt Features

  • Full MQTTv5 Feature Compliance, compatibility for MQTT v3.1.1 and v3.0.0.
  • TCP, Websocket, (including SSL/TLS) and Dashboard listeners.
  • File-based server, auth, storage and bridge configuration, Click to see config examples.
  • Auth and ACL Plugin is supported Redis, HTTP, Mysql and PostgreSql.
  • Packets are bridged to kafka according to the configured rule.
  • Single-machine mode supports local storage BBolt, Badger and Redis.
  • Hook design pattern makes it easy to develop plugins for Auth, Bridge, and Storage.
  • Cluster support is based on Gossip and Raft, Click to Cluster README.

Roadmap

  • Dashboard.
  • Rule engine.
  • Bridge(Other Mqtt Broker、RocketMQ、RabbitMQ).
  • Enhanced Metrics support.
  • CoAP.

Quick Start

Running the Broker with Go

Comqtt can be used as a standalone broker. Simply checkout this repository and run the cmd/single/main.go entrypoint in the cmd folder which will expose tcp (:1883), websocket (:1882), and dashboard (:8080) listeners.

Build

cd cmd
go build -o comqtt ./single/main.go

Start

./comqtt
or
./comqtt --conf=./config/single.yml

If you want to obtain the bridge and multiple authentication capabilities, you need to use the configuration file to start.Click to config example.

Using Docker

A simple Dockerfile is provided for running the cmd/single/main.go Websocket, TCP, and Stats server:

docker build -t comqtt:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 comqtt:latest

Developing with Comqtt

Importing as a package

Importing Comqtt as a package requires just a few lines of code to get started.

import (
  "log"

  "github.com/wind-c/comqtt/v2/mqtt"
  "github.com/wind-c/comqtt/v2/mqtt/hooks/auth"
  "github.com/wind-c/comqtt/v2/mqtt/listeners"
)

func main() {
  // Create the new MQTT Server.
  server := mqtt.New(nil)

  // Allow all connections.
  _ = server.AddHook(new(auth.AllowHook), nil)

  // Create a TCP listener on a standard port.
  tcp := listeners.NewTCP("t1", ":1883", nil)
  err := server.AddListener(tcp)
  if err != nil {
    log.Fatal(err)
  }

  err = server.Serve()
  if err != nil {
    log.Fatal(err)
  }
}

Examples of running the broker with various configurations can be found in the mqtt/examples folder.

Network Listeners

The server comes with a variety of pre-packaged network listeners which allow the broker to accept connections on different protocols. The current listeners are:

Listener Usage
listeners.NewTCP A TCP listener
listeners.NewUnixSock A Unix Socket listener
listeners.NewNet A net.Listener listener
listeners.NewWebsocket A Websocket listener
listeners.NewHTTPStats An HTTP $SYS info dashboard
listeners.NewHTTPHealthCheck An HTTP healthcheck listener to provide health check responses for e.g. cloud infrastructure

Use the listeners.Listener interface to develop new listeners. If you do, please let us know!

A *listeners.Config may be passed to configure TLS.

Examples of usage can be found in the mqtt/examples folder or cmd/single/main.go.

Server Options and Capabilities

A number of configurable options are available which can be used to alter the behaviour or restrict access to certain features in the server.

server := mqtt.New(&mqtt.Options{
  Capabilities: mqtt.Capabilities{
    MaximumSessionExpiryInterval: 3600,
    Compatibilities: mqtt.Compatibilities{
      ObscureNotAuthorized: true,
    },
  },
  ClientNetWriteBufferSize: 1024,
  ClientNetReadBufferSize: 1024,
  SysTopicResendInterval: 10,
})

Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options.

Event Hooks

A universal event hooks system allows developers to hook into various parts of the server and client life cycle to add and modify functionality of the broker. These universal hooks are used to provide everything from authentication, persistent storage, to debugging tools.

Hooks are stackable - you can add multiple hooks to a server, and they will be run in the order they were added. Some hooks modify values, and these modified values will be passed to the subsequent hooks before being returned to the runtime code.

Type Import Info
Access Control mqtt/hooks/auth.AllowHook Allow access to all connecting clients and read/write to all topics.
Access Control mqtt/hooks/auth.Auth Rule-based access control ledger.
Persistence mqtt/hooks/storage/bolt Persistent storage using BoltDB (deprecated).
Persistence mqtt/hooks/storage/badger Persistent storage using BadgerDB.
Persistence mqtt/hooks/storage/redis Persistent storage using Redis.
Debugging mqtt/hooks/debug Additional debugging output to visualise packet flow.

Many of the internal server functions are now exposed to developers, so you can make your own Hooks by using the above as examples. If you do, please Open an issue and let everyone know!

Authentication

Currently, Auth and ACL support the following back-end storage: Redis, Mysql, Postgresql, and Http. User password supported encryption algorithm: 0 no encrypt, 1 bcrypt(cost=10), 2 md5, 3 sha1, 4 sha256, 5 sha512, 6 hmac-sha1, 7 hmac-sha256, 8 hmac-sha512.

The following uses the postgresql and bcrypt encryption algorithms as examples.

Postgresql

The schema required is as follows:

BEGIN;
CREATE TABLE mqtt_user (
    id serial PRIMARY KEY,
    username TEXT NOT NULL UNIQUE,
    password TEXT NOT NULL,
    allow smallint DEFAULT 1 NOT NULL,
    created timestamp with time zone DEFAULT NOW(),
    updated timestamp
);

CREATE TABLE mqtt_acl(
    id serial PRIMARY KEY,
    username TEXT NOT NULL,
    topic TEXT NOT NULL,
    access smallint DEFAULT 3 NOT NULL,
    created timestamp with time zone DEFAULT NOW(),
    updated timestamp
);
CREATE INDEX mqtt_acl_username_idx ON mqtt_acl(username);
COMMIT;

Note that password for MQTT clients stored in PostgreSQL is stored as bcrypt hashed passwords. Therefore, to create / update new MQTT clients you can use this Python snippet:

import bcrypt
salt = bcrypt.gensalt(rounds=10)
hashed = bcrypt.hashpw(b"VeryVerySecretPa55w0rd", salt)
print(f"Password hash for MQTT client: {hashed}")

Go snippet:

import "golang.org/x/crypto/bcrypt"
hashed, err := bcrypt.GenerateFromPassword(pwd, bcrypt.DefaultCost)
if err != nil {
	return 
}
println("Password hash for MQTT client: ", hashed)

Access Control

Allow Hook

By default, Comqtt uses a DENY-ALL access control rule. To allow connections, this must overwritten using an Access Control hook. The simplest of these hooks is the auth.AllowAll hook, which provides ALLOW-ALL rules to all connections, subscriptions, and publishing. It's also the simplest hook to use:

server := mqtt.New(nil)
_ = server.AddHook(new(auth.AllowHook), nil)

Don't do this if you are exposing your server to the internet or untrusted networks - it should really be used for development, testing, and debugging only.

Auth Ledger

The Auth Ledger hook provides a sophisticated mechanism for defining access rules in a struct format. Auth ledger rules come in two forms: Auth rules (connection), and ACL rules (publish subscribe).

Auth rules have 4 optional criteria and an assertion flag:

Criteria Usage
Client client id of the connecting client
Username username of the connecting client
Password password of the connecting client
Remote the remote address or ip of the client
Allow true (allow this user) or false (deny this user)

ACL rules have 3 optional criteria and an filter match:

Criteria Usage
Client client id of the connecting client
Username username of the connecting client
Remote the remote address or ip of the client
Filters an array of filters to match

Rules are processed in index order (0,1,2,3), returning on the first matching rule. See hooks/auth/ledger.go to review the structs.

server := mqtt.New(nil)
err := server.AddHook(new(auth.Hook), &auth.Options{
    Ledger: &auth.Ledger{
    Auth: auth.AuthRules{ // Auth disallows all by default
      {Username: "peach", Password: "password1", Allow: true},
      {Username: "melon", Password: "password2", Allow: true},
      {Remote: "127.0.0.1:*", Allow: true},
      {Remote: "localhost:*", Allow: true},
    },
    ACL: auth.ACLRules{ // ACL allows all by default
      {Remote: "127.0.0.1:*"}, // local superuser allow all
      {
        // user melon can read and write to their own topic
        Username: "melon", Filters: auth.Filters{
          "melon/#":   auth.ReadWrite,
          "updates/#": auth.WriteOnly, // can write to updates, but can't read updates from others
        },
      },
      {
        // Otherwise, no clients have publishing permissions
        Filters: auth.Filters{
          "#":         auth.ReadOnly,
          "updates/#": auth.Deny,
        },
      },
    },
  }
})

The ledger can also be stored as JSON or YAML and loaded using the Data field:

err = server.AddHook(new(auth.Hook), &auth.Options{
    Data: data, // build ledger from byte slice: yaml or json
})

See examples/auth/encoded/main.go for more information.

Persistent Storage

Redis

A basic Redis storage hook is available which provides persistence for the broker. It can be added to the server in the same fashion as any other hook, with several options. It uses github.com/go-redis/redis/v8 under the hook, and is completely configurable through the Options value.

err := server.AddHook(new(redis.Hook), &redis.Options{
  Options: &rv8.Options{
    Addr:     "localhost:6379", // default redis address
    Password: "",               // your password
    DB:       0,                // your redis db
  },
})
if err != nil {
  log.Fatal(err)
}

For more information on how the redis hook works, or how to use it, see the mqtt/examples/persistence/redis/main.go or hooks/storage/redis code.

Badger DB

There's also a BadgerDB storage hook if you prefer file based storage. It can be added and configured in much the same way as the other hooks (with somewhat less options).

err := server.AddHook(new(badger.Hook), &badger.Options{
  Path: badgerPath,
})
if err != nil {
  log.Fatal(err)
}

For more information on how the badger hook works, or how to use it, see the mqtt/examples/persistence/badger/main.go or hooks/storage/badger code.

There is also a BoltDB hook which has been deprecated in favour of Badger, but if you need it, check mqtt/examples/persistence/bolt/main.go.

Developing with Event Hooks

Many hooks are available for interacting with the broker and client lifecycle. The function signatures for all the hooks and mqtt.Hook interface can be found in mqtt/hooks.go.

The most flexible event hooks are OnPacketRead, OnPacketEncode, and OnPacketSent - these hooks be used to control and modify all incoming and outgoing packets.

Function Usage
OnStarted Called when the server has successfully started.
OnStopped Called when the server has successfully stopped.
OnConnectAuthenticate Called when a user attempts to authenticate with the server. An implementation of this method MUST be used to allow or deny access to the server (see hooks/auth/allow_all or basic). It can be used in custom hooks to check connecting users against an existing user database. Returns true if allowed.
OnACLCheck Called when a user attempts to publish or subscribe to a topic filter. As above.
OnSysInfoTick Called when the $SYS topic values are published out.
OnConnect Called when a new client connects, may return an error or packet code to halt the client connection process.
OnSessionEstablish Called immediately after a new client connects and authenticates and immediately before the session is established and CONNACK is sent.
OnSessionEstablished Called when a new client successfully establishes a session (after OnConnect)
OnDisconnect Called when a client is disconnected for any reason.
OnAuthPacket Called when an auth packet is received. It is intended to allow developers to create their own mqtt v5 Auth Packet handling mechanisms. Allows packet modification.
OnPacketRead Called when a packet is received from a client. Allows packet modification.
OnPacketEncode Called immediately before a packet is encoded to be sent to a client. Allows packet modification.
OnPacketSent Called when a packet has been sent to a client.
OnPacketProcessed Called when a packet has been received and successfully handled by the broker.
OnSubscribe Called when a client subscribes to one or more filters. Allows packet modification.
OnSubscribed Called when a client successfully subscribes to one or more filters.
OnSelectSubscribers Called when subscribers have been collected for a topic, but before shared subscription subscribers have been selected. Allows receipient modification.
OnUnsubscribe Called when a client unsubscribes from one or more filters. Allows packet modification.
OnUnsubscribed Called when a client successfully unsubscribes from one or more filters.
OnPublish Called when a client publishes a message. Allows packet modification.
OnPublished Called when a client has published a message to subscribers.
OnPublishDropped Called when a message to a client is dropped before delivery, such as if the client is taking too long to respond.
OnRetainMessage Called then a published message is retained.
OnRetainPublished Called then a retained message is published to a client.
OnQosPublish Called when a publish packet with Qos >= 1 is issued to a subscriber.
OnQosComplete Called when the Qos flow for a message has been completed.
OnQosDropped Called when an inflight message expires before completion.
OnPacketIDExhausted Called when a client runs out of unused packet ids to assign.
OnWill Called when a client disconnects and intends to issue a will message. Allows packet modification.
OnWillSent Called when an LWT message has been issued from a disconnecting client.
OnClientExpired Called when a client session has expired and should be deleted.
OnRetainedExpired Called when a retained message has expired and should be deleted.
StoredClients Returns clients, eg. from a persistent store.
StoredSubscriptions Returns client subscriptions, eg. from a persistent store.
StoredInflightMessages Returns inflight messages, eg. from a persistent store.
StoredRetainedMessages Returns retained messages, eg. from a persistent store.
StoredSysInfo Returns stored system info values, eg. from a persistent store.

If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need OnACLCheck and OnConnectAuthenticate.

Direct Publish

To publish basic message to a topic from within the embedding application, you can use the server.Publish(topic string, payload []byte, retain bool, qos byte) error method.

err := server.Publish("direct/publish", []byte("packet scheduled message"), false, 0)

The Qos byte in this case is only used to set the upper qos limit available for subscribers, as per MQTT v5 spec.

Packet Injection

If you want more control, or want to set specific MQTT v5 properties and other values you can create your own publish packets from a client of your choice. This method allows you to inject MQTT packets (no just publish) directly into the runtime as though they had been received by a specific client. Most of the time you'll want to use the special client flag inline=true, as it has unique privileges: it bypasses all ACL and topic validation checks, meaning it can even publish to $SYS topics.

Packet injection can be used for any MQTT packet, including ping requests, subscriptions, etc. And because the Clients structs and methods are now exported, you can even inject packets on behalf of a connected client (if you have a very custom requirements).

cl := server.NewClient(nil, "local", "inline", true)
server.InjectPacket(cl, packets.Packet{
  FixedHeader: packets.FixedHeader{
    Type: packets.Publish,
  },
  TopicName: "direct/publish",
  Payload: []byte("scheduled message"),
})

MQTT packets still need to be correctly formed, so refer our the test packets catalogue and MQTTv5 Specification for inspiration.

See the hooks example to see this feature in action.

Testing

Unit Tests

Comqtt tests over a thousand scenarios with thoughtfully hand written unit tests to ensure each function does exactly what we expect. You can run the tests using go:

go run --cover ./...

Paho Interoperability Test

You can check the broker against the Paho Interoperability Test by starting the broker using examples/paho/main.go, and then running the mqtt v5 and v3 tests with python3 client_test5.py from the interoperability folder.

Note that there are currently a number of outstanding issues regarding false negatives in the paho suite, and as such, certain compatibility modes are enabled in the paho/main.go example.

Performance Benchmarks

Comqtt performance is comparable with popular brokers such as Mosquitto, EMQX, and others.

Performance benchmarks were tested using MQTT-Stresser on a Apple Macbook Air M2, using cmd/main.go default settings. Taking into account bursts of high and low throughput, the median scores are the most useful. Higher is better.

The values presented in the benchmark are not representative of true messages per second throughput. They rely on an unusual calculation by mqtt-stresser, but are usable as they are consistent across all brokers. Benchmarks are provided as a general performance expectation guideline only. Comparisons are performed using out-of-the-box default configurations.

mqtt-stresser -broker tcp://localhost:1883 -num-clients=2 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Comqtt v2.0.0 124,772 125,456 124,614 314,461 313,186 311,910
Mosquitto v2.0.15 155,920 155,919 155,918 185,485 185,097 184,709
EMQX v5.0.11 156,945 156,257 155,568 17,918 17,783 17,649
Rumqtt v0.21.0 112,208 108,480 104,753 135,784 126,446 117,108

mqtt-stresser -broker tcp://localhost:1883 -num-clients=10 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Comqtt v2.0.0 45,615 30,129 21,138 232,717 86,323 50,402
Mosquitto v2.0.15 42,729 38,633 29,879 23,241 19,714 18,806
EMQX v5.0.11 21,553 17,418 14,356 4,257 3,980 3,756
Rumqtt v0.21.0 42,213 23,153 20,814 49,465 36,626 19,283

Million Message Challenge (hit the server with 1 million messages immediately):

mqtt-stresser -broker tcp://localhost:1883 -num-clients=100 -num-messages=10000

Broker publish fastest median slowest receive fastest median slowest
Comqtt v2.0.0 51,044 4,682 2,345 72,634 7,645 2,464
Mosquitto v2.0.15 3,826 3,395 3,032 1,200 1,150 1,118
EMQX v5.0.11 4,086 2,432 2,274 434 333 311
Rumqtt v0.21.0 78,972 5,047 3,804 4,286 3,249 2,027

Not sure what's going on with EMQX here, perhaps the docker out-of-the-box settings are not optimal, so take it with a pinch of salt as we know for a fact it's a solid piece of software.

Contributions

Contributions and feedback are both welcomed and encouraged! Open an issue to report a bug, ask a question, or make a feature request.

comqtt's People

Contributors

asbjorn avatar barribarri20 avatar fxk2006 avatar jlundy2 avatar kenuestar avatar mochi-co avatar perbu avatar smugg99 avatar suixinio avatar trozensztrauch avatar werbenhu avatar wind-c avatar wwhai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

comqtt's Issues

panic during performance benchmark

Benchmark command:
mqtt-stresser -broker tcps://localhost:18883 -skip-tls-verification -num-clients 10 -num-messages 80000 -rampup-delay 1s -rampup-size 10 -global-timeout 180s -timeout 20s

It will panic when it got the "An existing connection was forcibly closed by the remote host" error message.

2022/11/07 18:56:43 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:60634: wsasend: An existing connection was forcibly closed by the remote host.
2022/11/07 18:56:43 error writing to buffer io.Writer; write tcp 127.0.0.1:18883->127.0.0.1:60633: wsasend: An existing connection was forcibly closed by the remote host.
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xc0000005 code=0x0 addr=0x0 pc=0x46d467]

goroutine 39 [running]:
github.com/wind-c/comqtt/server/internal/circ.(*Writer).Write(0x0, {0xc001557b00, 0x4f, 0xc0000b7d28?})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/internal/circ/writer.go:80 +0x27
github.com/wind-c/comqtt/server/internal/clients.(*Client).WritePacket(, {{0x4d, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/internal/clients/clients.go:525 +0x17a
github.com/wind-c/comqtt/server.(*Server).writeClient(
, , {{0x4d, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/server.go:542 +0x5d
github.com/wind-c/comqtt/server.(*Server).publishToSubscribers(
, {{0x4d, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/server.go:665 +0x794
github.com/wind-c/comqtt/server.(*Server).processPublish(_, , {{0x4d, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/process.go:149 +0x678
github.com/wind-c/comqtt/server.(*Server).processPacket(
, _, {{0x4d, 0x3, 0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/process.go:28 +0x138
github.com/wind-c/comqtt/server/internal/clients.(*Client).Read(0xc0000b0280, 0xc0000b9940)
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/internal/clients/clients.go:401 +0x19d
github.com/wind-c/comqtt/server.(*Server).EstablishConnection(0xc000120370, {0x59cd6b, 0x2}, {0x62af78, 0xc0000a6700}, {0x628a10?, 0x83f3e0?})
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/server.go:362 +0x10f2
github.com/wind-c/comqtt/server/listeners.(*TCP).Serve.func1()
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/listeners/tcp.go:113 +0x3d
created by github.com/wind-c/comqtt/server/listeners.(*TCP).Serve
C:/Users/test/go/pkg/mod/github.com/wind-c/[email protected]/server/listeners/tcp.go:112 +0xea

Running the cluster without doing changes causes unclean shutdown

Hi there Wind.

Excellent work on the tests. 🥇 I'm very happy about this. We're 99% of the way.

I struggle a bit with that leak test I did. So what I do is run a cluster node with the following arguments:
-node-name testnode -members localhost:7946 -raft-bootstrap true -storage-way memory

This runs cleanly, but if I shut it down (Ctrl-C), the following happens:

12:39PM INF comqtt server started
^C12:39PM WRN caught signal, stopping...
12:39PM INF stopping raft...
12:39PM ??? 2023-08-01T10:39:42.648+0200 [ERROR] raft: failed to take snapshot: error="nothing new to snapshot"
12:39PM WRN failed to create snapshot!
12:39PM INF raft stopped
12:39PM INF stopping node...
12:39PM ??? 2023/08/01 10:39:42 [INFO] serf: EventMemberLeave: testnode 127.0.0.1
12:39PM ERR not graceful stop
12:39PM DBG system event loop halted

I suspect this causes the leaktest, which basically does this, does not to complete correctly, leaving a couple of goroutines hanging. Is the error "nothing new to snapshot" - really an error?

All the best,
Per.

server.Events.OnSubscribe, 加入订阅是没有打印信息

在main.go里加了server.Events.OnSubscribe
// Add OnSubscribe Event Hook
server.Events.OnSubscribe = func(filter string, cl events.Client, qos byte, isFirst bool) {
fmt.Printf("<< OnSubscribe client subscribed %s: %s %d %t \n", cl.ID, filter, qos, isFirst)
}
客户端订阅主题的时候没有打印出信息?

有例子参考是如何使用subscribe吗? 客户端接收不到订阅的主题.

Persist messages to kafka

Hello.

We're considering adding support to persist all MQTT topic matching a certain filter, to Kafka. Not sure if this is relevant for the project. If it is relevant for the project, let me know and we'll work together as we implement it according to your guidelines.

If not, which is perfectly fine, we'll just do it in a private fork.

All the best,

Per.

Use of Init to launch http server.

In both the single and cluster main() there are web servers started in init() on port 6060.

This complicates thing when testing as there no easy way to shut the server down again. The server responds with a 404 and disabling it doesn't seem to affect the server.

Are you ok with removing this?

Missing OnPublishedWithSharedFilters in debug hook

Seems that at runtime debug hook results in panic due to lack of OnPublishedWithSharedFilters implementation. This happens only when some client publishes

Logs

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x98 pc=0x1a137e8]

goroutine 191 [running]:
github.com/wind-c/comqtt/v2/mqtt/hooks/debug.(*Hook).OnPublishedWithSharedFilters(_, {{{{0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...}, ...}, ...}, ...)
        <autogenerated>:1 +0x28
github.com/wind-c/comqtt/v2/mqtt.(*Hooks).OnPublishedWithSharedFilters(_, {{{{0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...}, ...}, ...}, ...)
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/hooks.go:558 +0x14c
github.com/wind-c/comqtt/v2/mqtt.(*Server).PublishToSubscribers(_, {{{{0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...}, ...}, ...}, ...)
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:961 +0x2ec
github.com/wind-c/comqtt/v2/mqtt.(*Server).publishToSubscribers(...)
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:925
github.com/wind-c/comqtt/v2/mqtt.(*Server).processPublish(_, _, {{{{0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, ...}, ...}, ...}, ...})
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:875 +0x7e5
github.com/wind-c/comqtt/v2/mqtt.(*Server).processPacket(_, _, {{{{0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, ...}, ...}, ...}, ...})
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:627 +0x128
github.com/wind-c/comqtt/v2/mqtt.(*Server).receivePacket(_, _, {{{{0x0, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, 0x0, ...}, ...}, ...}, ...})
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:432 +0x58
github.com/wind-c/comqtt/v2/mqtt.(*Client).Read(0xc000ec8280, 0xc0012457c0)
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/clients.go:362 +0x15d
github.com/wind-c/comqtt/v2/mqtt.(*Server).attachClient(0xc000af5860, 0xc000ec8280, {0x1f055d9, 0x3})
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:387 +0x605
github.com/wind-c/comqtt/v2/mqtt.(*Server).EstablishConnection(0x0?, {0x1f055d9, 0x3}, {0x2298db8?, 0xc001132000?})
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/server.go:327 +0x5e
github.com/wind-c/comqtt/v2/mqtt/listeners.(*TCP).Serve.func1()
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/listeners/tcp.go:84 +0x43
created by github.com/wind-c/comqtt/v2/mqtt/listeners.(*TCP).Serve in goroutine 44
        /home/czort/go/pkg/mod/github.com/wind-c/comqtt/[email protected]/mqtt/listeners/tcp.go:83 +0x128

Goleak failing

One last thing. Goleak seems to be in use. This sounds like a good idea, but it doesn't seem to be implemented correctly yet.

I've got no experience with it so I'm not sure if it serves a purpose. From the somewhat sparse documentation of Goleak, it seem the idea is to run some tests and then it'll go over the runtime and check if there are goroutines left.

If this is correct, I suggest we just delete main_test.go here.

Alternatively, I suspect we might need to refactor main() so it'll hand over control after setting up a context to something like main2() which would get the done-channel (or a ctx). Then write a test that takes up the process, send a couple of messages, and after that close the done-channel. Goleak, should than be able to check if we are able to clean up correctly.

Per.

About client lifecycle

Description

Should add an event interface which named OnConnected, Because that's only have OnConnect , But it seems only suit for todo something BEFORE connect,when we open Auth plugin,'OnConnected' maybe not trigger if auth failure。
image

Such bellow

type Events struct {
	OnProcessMessage // published message receieved before evaluation.
	OnMessage          // published message receieved.
	OnError                // server error.
	OnConnect           // client ready connect.
        OnConnected        // client connected.
	OnDisconnect       // client disconnected.
	OnSubscribe        // topic subscription created.
	OnUnsubscribe    // topic subscription removed.
}

问题

  1. 项目会长期维护吗
  2. 项目和 https://github.com/mochi-co/mqtt readme内数据高度相似是为啥
Mochi | Mosquitto | EMQX | VerneMQ | Mosca
-- | -- | -- | -- | --
SEND Max | 36505 | 30597 | 27202 | 32782 | 30125
  1. comqtt 和 mochi-co/mqtt 有什么关联吗

这个是部署3台连成的集群,就co-001无端的报错,我是上午部署,下午就报这错误

这个是部署3台连成的集群,我只是加了ssl服务,就co-001无端的报错,我是上午部署,都没有连接请求,下午就报这错误。
希望大佬研究下稳定性如何。

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x8c4b6b]

goroutine 1846 [running]:
github.com/wind-c/comqtt/server/internal/clients.(*Client).ReadConnectPacket(_, )
/Users/w/Documents/comqtt-main/server/internal/clients/clients.go:425 +0x6b
github.com/wind-c/comqtt/server.(*Server).readConnectionPacket(
, _)
/Users/w/Documents/comqtt-main/server/server.go:382 +0xc8
github.com/wind-c/comqtt/server.(*Server).EstablishConnection(0xc000184840, {0xc60db1, 0x2}, {0xd759a0, 0xc0004be000}, {0xd6ec08?, 0x121b3d8?})
/Users/w/Documents/comqtt-main/server/server.go:305 +0x728
github.com/wind-c/comqtt/server/listeners.(*TCP).Serve.func1()
/Users/w/Documents/comqtt-main/server/listeners/tcp.go:113 +0x3d
created by github.com/wind-c/comqtt/server/listeners.(*TCP).Serve
/Users/w/Documents/comqtt-main/server/listeners/tcp.go:112 +0xea
2022/11/18 18:15:45 [INFO] memberlist: Suspect co-001 has failed, no acks received

关于跨机器建立集群问题,两台外网机器,进行跨机器建立集群,已经joined,就提示没有Suspect co-001 has failed, no acks received

目前就两台外网机器,进行跨机器建立集群

其中一台 conf.yml:

cluster:
node-name: co-009 #The name of this node. This must be unique in the cluster.If nodename is not set, use the local hostname.
bind-port: 1886 #The port is used for both UDP and TCP gossip.Used for member discovery and communication.
members: localhost:1886,172.17.224.50:1886,172.17.224.51:1886 #seeds member list, format such as 192.168.0.103:7946,192.168.0.104:7946
queue-depth: 1024000 #size of Memberlist's internal channel which handles UDP messages.
raft-port: 1887
raft-dir: ./raft/node1

另中一台的conf.yml:

cluster:
node-name: co-001 #The name of this node. This must be unique in the cluster.If nodename is not set, use the local hostname.
bind-port: 1886 #The port is used for both UDP and TCP gossip.Used for member discovery and communication.
members: localhost:1886,172.17.224.50:1886,172.17.224.51:1886 #seeds member list, format such as 192.168.0.103:7946,192.168.0.104:7946
queue-depth: 1024000 #size of Memberlist's internal channel which handles UDP messages.
raft-port: 1887
raft-dir: ./raft/node1

就是我两台机器配置的members,都填一样的ip

报错信息:

A node has joined: co-009
A node has joined: co-001
Local member 172.17.224.50:1886
gogo BootstrapRaft
2022-10-11T10:20:51.812+0800 [INFO] raft: initial configuration: index=1 servers="[{Suffrage:Voter ID:co-009 Address:172.17.224.50:1887}]"
2022-10-11T10:20:51.812+0800 [INFO] raft: entering follower state: follower="Node at 172.17.224.50:1887 [Follower]" leader-address= leader-id=
Cluster Node Created!
Mqtt Server Started!
2022-10-11T10:20:53.139+0800 [WARN] raft: heartbeat timeout reached, starting election: last-leader-addr= last-leader-id=
2022-10-11T10:20:53.140+0800 [INFO] raft: entering candidate state: node="Node at 172.17.224.50:1887 [Candidate]" term=7
2022-10-11T10:20:53.146+0800 [INFO] raft: election won: tally=1
2022-10-11T10:20:53.146+0800 [INFO] raft: entering leader state: leader="Node at 172.17.224.50:1887 [Leader]"
2022/10/11 10:20:53 [INFO] memberlist: Suspect co-001 has failed, no acks received
2022/10/11 10:20:56 [INFO] memberlist: Suspect co-001 has failed, no acks received
2022/10/11 10:20:57 [INFO] memberlist: Marking co-001 as failed, suspect timeout reached (0 peer confirmations)
A node has left: co-001
2022/10/11 10:21:00 [INFO] memberlist: Suspect co-001 has failed, no acks received

有加入joined,就是没有回应,不知道是不是配置文件错了, 看了是内部代码报的错,如果是我的conf.yml配置错了,希望作者给个正确conf.yml配置方式,感谢

Development

Hi.
I've been spending some more time in the code base and I have a couple of suggestions. I'm offering to help here if we want to do this:

  1. Protect the main branch and have all development go through PRs. The main reason for this is:
  2. Have github refuse to merge PRs that have failing tests.

Having failing tests in main for more than a few hours is not a great look and also makes it really hard to work on the codebase.

Before implementing this the tests need to be all green, though. I'll see what I can do, I'm new to the codebase and I'm not always sure what the issue is.

All the best,

Per.

Dropping messages in OnPublish

I have been trying to figure out if there is a way to drop messages in the OnPublish hook. Both to drop them silently (ie. let the client believe that the message was published normally) and to drop them with some error state sent back to the client.

comqtt:cl gateways are missing

Hi,

I'm using your software since a few months and its genius! Thank you for your work.

I experenced some issues, I have two gateways and one is missing from "comqtt:cl", the funny thing is that I still get updates from the missing gateway and the last messages from comqtt are for both gateways:
{
"action":"connect"
"timestamp":1711650090
}
{
"action":"connect"
"timestamp":1711650090
}

We are using this software for a new project and I wanted to ask if there is a way to fund your work?

Best Regards,
René

Enable "hacktoberfest" contributions?

Hi!

Can we either add the topic "hacktoberfest" or add the PR label "hacktoberfest-accepted" to attract contributions during the global "hacktoberfest" movement?

集群节点转发的PUBLISH消息解析失败

你好,我在测试集群功能的时候发现集群的PUBLISH消息无法正常转发。
通过Debug,发现是因为接受端的Broker在处理转发过来的字节流时,无法成功将字节流解析成PUBLISH消息。
问题代码如下

func (a *Agent) processRelayMsg(msg *message.Message) {
···
	case packets.Publish:
		pk := packets.Packet{FixedHeader: packets.FixedHeader{Type: packets.Publish}}
		pk.ProtocolVersion = msg.ProtocolVersion
		pk.Origin = msg.ClientID
		pk.FixedHeader.Decode(msg.Payload[0])                     // Unpack fixedheader.
		if err := pk.PublishDecode(msg.Payload[2:]); err == nil { // Unpack skips fixedheader
			a.mqttServer.PublishToSubscribers(pk)
			OnPublishPacketLog(DirectionInbound, msg.NodeID, msg.ClientID, pk.TopicName, pk.PacketID)
		}
···
}

出现解析失败的原因是因为剩余长度是一个变长字节整数,长度是不固定的。因此不能使用msg.Payload[2:]来获取可变报头和有效载荷。

解决方法就是计算出剩余长度remaining,用len(msg)-remaining求出offset,使用msg.Payload[offset:]来获取可变报头和有效载荷。

以下是我修改后的代码,已自测通过。

func (a *Agent) processRelayMsg(msg *message.Message) {
	switch msg.Type {
	case message.RaftJoin:
		addr := string(msg.Payload)
		err := a.raftPeer.Join(msg.NodeID, addr)
		OnJoinLog(msg.NodeID, addr, "raft join", err)
	case packets.Subscribe, packets.Unsubscribe:
		a.raftPropose(msg)
	case packets.Publish:
		pk := packets.Packet{FixedHeader: packets.FixedHeader{Type: packets.Publish}}
		pk.ProtocolVersion = msg.ProtocolVersion
		pk.Origin = msg.ClientID
		if err := a.ReadFixedHeader(msg.Payload, &pk.FixedHeader); err != nil {
			return
		}
		offset := len(msg.Payload) - pk.FixedHeader.Remaining
		if err := pk.PublishDecode(msg.Payload[offset:]); err == nil { // Unpack skips fixedheader
			a.mqttServer.PublishToSubscribers(pk)
			OnPublishPacketLog(DirectionInbound, msg.NodeID, msg.ClientID, pk.TopicName, pk.PacketID)
		}
	case packets.Connect:
		//If a client is connected to another node, the client's data cached on the node needs to be cleared
		if existing, ok := a.mqttServer.Clients.Get(msg.ClientID); ok {
			// connection notify from other node
			existing.Stop(packets.ErrSessionTakenOver)
			// clean local session and subscriptions
			a.mqttServer.UnsubscribeClient(existing)
			a.mqttServer.Clients.Delete(msg.ClientID)
		}
		OnConnectPacketLog(DirectionInbound, msg.NodeID, msg.ClientID)
	}
}

func (a *Agent) ReadFixedHeader(b []byte, fh *packets.FixedHeader) error {
	err := fh.Decode(b[0])
	if err != nil {
		return err
	}
	fh.Remaining, err = DecodeLength(b[1:])
	if err != nil {
		return err
	}
	return nil
}

func DecodeLength(b []byte) (n int, err error) {
	// see 1.5.5 Variable Byte Integer decode non-normative
	// https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901027
	var multiplier uint32
	var value uint32
	index := 0
	for {
		eb := b[index]
		if err != nil {
			return 0, err
		}

		value |= uint32(eb&127) << multiplier
		if value > 268435455 {
			return 0, nil
		}

		if (eb & 128) == 0 {
			break
		}

		multiplier += 7
		index++
	}

	return int(value), nil
}

Disconnecting while Kafka bridge turned on causes a panic

If bridge-way is set to 1, disconnecting an MQTT client causes the following error:

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x18 pc=0xbbee38]

goroutine 51 [running]:
github.com/wind-c/comqtt/v2/plugin/bridge/kafka.(*Bridge).OnDisconnect(0xc000382960, 0xc00011e780, {0x0, 0x0}, 0x68?)
     /app/plugin/bridge/kafka/kafka.go:279 +0x98
github.com/wind-c/comqtt/v2/mqtt.(*Hooks).OnDisconnect(0xc00030c2a0?, 0xd628c1?, {0x0, 0x0}, 0x3?)
     /app/mqtt/hooks.go:265 +0x13c
github.com/wind-c/comqtt/v2/mqtt.(*Server).attachClient(0xc000382190, 0xc00011e780, {0xd54ca3, 0x3})
    /app/mqtt/server.go:381 +0x7c5
github.com/wind-c/comqtt/v2/mqtt.(*Server).EstablishConnection(0x8747ae?, {0xd54ca3, 0x3}, {0xe712a8?, 0xc000012008?})
     /app/mqtt/server.go:311 +0x65
github.com/wind-c/comqtt/v2/mqtt/listeners.(*TCP).Serve.func1()
     /app/mqtt/listeners/tcp.go:84 +0x47
created by github.com/wind-c/comqtt/v2/mqtt/listeners.(*TCP).Serve
     /app/mqtt/listeners/tcp.go:83 +0x145

Not sure if it's because the client has already disconnected or what.

Authenticating against a MongoDB collection with Bcrypt-hashed passwords

Firstly - thank you for this amazing library.

To save people the trouble if you're looking to use a DB other than Redis or PostgreSQL, here's a quick guide to doing authentication. In my case, i needed to access users in Mongo which had been stored by Laravel using Bcrypt. It wasn't amazingly clear when parsing the examples; particularly the use of packets.

This is obviously debug code (printing out user/password etc), and there's more involved. But it's a rough overview.

package main

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"

	"github.com/wind-c/comqtt/v2/mqtt"
	"github.com/wind-c/comqtt/v2/mqtt/hooks/auth"
	"github.com/wind-c/comqtt/v2/mqtt/hooks/debug"
	"github.com/wind-c/comqtt/v2/mqtt/listeners"
	"github.com/wind-c/comqtt/v2/mqtt/packets"
	"golang.org/x/crypto/bcrypt"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type Auth struct {
    mqtt.HookBase 
    mongo_client   *mongo.Client
    database       string
    collection string
}

func auth_hook(mongo_client *mongo.Client) *Auth {
    return &Auth{
		mongo_client: mongo_client,
		database:   env("DB_DATABASE", "mycooldb"),
		collection: env("DB_AUTH_COLLECTION", "users"),
	}
}

func (hook *Auth) ID() string {
	return "mongodb-auth"
}

func (hook *Auth) Provides(b byte) bool {

	return b == mqtt.OnConnectAuthenticate
}

func (hook *Auth) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {

	log.Printf("Client trying to connect with username: %s, password: %v", string(pk.Connect.Username), string(pk.Connect.Password));

	var User bson.M

	err := users.FindOne(ctx, bson.M{env("DB_AUTH_USERNAME_FIELD", "username"): string(pk.Connect.Username)}).Decode(&User)
	
	if err != nil {
		log.Printf("Authentication failed for user %s: %v", string(pk.Connect.Username), err)

		return false // Authentication failed
	}

	password, ok := User[env("DB_AUTH_PASSWORD_FIELD", "password")].(string)

	if !ok {

		log.Printf("Invalid password format for user %s", string(pk.Connect.Username))

		return false
	}

	// Compare the provided password with the stored hashed password
	err = bcrypt.CompareHashAndPassword([]byte(password), []byte(pk.Connect.Password))

	if err != nil {
		
		log.Printf("Bcrypt hash comparison failed. Invalid password for user %s", string(pk.Connect.Username))

		return false // Authentication failed
	}

	log.Printf("User %s authenticated successfully.", string(pk.Connect.Username))
	return true
}

func env(key, def string) string {
	value := os.Getenv(key)

	if len(value) == 0 {
		return def
	}

	return value
}

func main() {
	done := make(chan bool, 1)
	sigs := make(chan os.Signal, 1)

	// Register the signals to catch
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

	go func() {
		sig := <-sigs
		log.Printf("Received signal: %s", sig)
		done <- true
	}()

       // Other stuff 
	
	mongo_uri_flag := flag.String("mongo_uri", env("MONGO_URI", "mongodb://user:pass@localhost:27017/?authSource=admin"), "MongoDB URI")

        mongo_client := mongo_client(*mongo_uri_flag)
        
        // vars declared elsewhere
        mqtt_server := mqtt.New(&mqtt.Options{
		Capabilities: &mqtt.Capabilities{
			MaximumSessionExpiryInterval: uint32(max_session_expiry_interval),
			Compatibilities: mqtt.Compatibilities{
				ObscureNotAuthorized: obscure_not_authorized,
			},
		},
		ClientNetWriteBufferSize: client_net_write_buffer_size,
		ClientNetReadBufferSize:  client_net_read_buffer_size,
		SysTopicResendInterval:   int64(sys_topic_resend_interval),
	})

	debug_hook := new(debug.Hook)

	err := mqtt_server.AddHook(debug_hook, &debug.Options{})

        if err != nil {
            log.Fatalf("Failed to add debug hook: %v", err)
        }

        err = mqtt_server.AddHook(auth_hook(mongo_client), nil)

        if err != nil {
	        log.Fatalf("Failed to add MongoDB authentication hook: %v", err)
        }
        
	// Start the MQTT server
          go func() {
              if err := mqtt_server.Serve(); err != nil {
                  log.Fatalf("Buzz MQTT server failed to start: %v", err)
              }
          }()
          
         // loads of other stuff
          
        <-done
        log.Println("Shutting down the server...")
    
        // Clean up resources here if necessary
        if err := mqtt_server.Close(); err != nil {
            log.Printf("Error shutting down server: %v", err)
        } else {
            log.Println("Server shutdown successfully.")
        }
}

MySQL auth: bad configuration causes panic

Using some code based on cmd/single/main.go with MySQL configuration in .yml, every time a client connected to the broker resulted in "panic: runtime error: invalid memory address or nil pointer dereference"

I eventually traced this to a typo in the field names I had put in my .yml. The result was that the code:
a.authStmt, _ = sqlxDB.Preparex(authSql)
.. was failing as the SQL was invalid.

However there's no code to catch that error and so when the client connects,
err := a.authStmt.QueryRowx(key).Scan(&password, &allow)
.. in OnConnectAuthenticate was crashing.

This is the first time I've worked in Go so I'm not going to attempt to suggest a suitable fix - for now I'll just be careful about config file typos!

mqtt.options配置加载不正确

`type Options struct {
// Capabilities defines the server features and behaviour. If you only wish to modify
// several of these values, set them explicitly - e.g.
// server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024
Capabilities *Capabilities

// ClientNetWriteBufferSize specifies the size of the client *bufio.Writer write buffer.
ClientNetWriteBufferSize int

// ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer.
ClientNetReadBufferSize int

// Logger specifies a custom configured implementation of zerolog to override
// the servers default logger configuration. If you wish to change the log level,
// of the default logger, you can do so by setting
// 	server := mqtt.New(nil)
// level := new(slog.LevelVar)
// server.Slog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
// 	Level: level,
// }))
// level.Set(slog.LevelDebug)
Logger *slog.Logger

// SysTopicResendInterval specifies the interval between $SYS topic updates in seconds.
SysTopicResendInterval int64

// Enable Inline client to allow direct subscribing and publishing from the parent codebase,
// with negligible performance difference (disabled by default to prevent confusion in statistics).
InlineClient bool

}`

配置的结构体没有加上yaml的配置,导致conf.yaml的mqtt.options下一级配置没有正确加载。

请问可以不redis使用吗?

目前看配置好像需要用到redis,没有使用redis,程序启动会失败,请问可以不用redis也能正常运行吗?那该如何修改呢?

谢谢大佬,我还没测出跨集群效果,单集群效果ok的,请教一个问题,你连接redis有没有出现Error: Connection reset by peer?因为跨集群只用一个redis服务,我从节点连主redis服务一直报 Connection reset by peer这个错误。。。搞了好久。感觉就差这一步了。

    谢谢大佬,我还没测出跨集群效果,单集群效果ok的,请教一个问题,你连接redis有没有出现Error: Connection reset by peer?因为跨集群只用一个redis服务,我从节点连主redis服务一直报 Connection reset by peer这个错误。。。搞了好久。感觉就差这一步了。

Originally posted by @skygp in #9 (comment)

Use custom logger

Hello,
Can the default zerolog logger be changed?
or maybe at least disable it?

Thanks.

跨节点数据同步消息同步丢数据,并且大概率会导致服务阻塞后续的pub无法同步到其他节点

我这里目前部署的是3个节点,单节点消息pub、sub没有任何问题,但是跨节点就会出现pub之后丢消息,消息丢失之后导致多借点之间数据同步阻塞后续消息无法pub同步到其他节点。
节点配置如下
节点1:
image
节点2:
image
节点3:
image

我这里测试使用的是mqtt-stresser作为测试客户端。
测试命令为./build/mqtt-stresser-linux-amd64 -log-level=0 -broker=tcp://127.0.0.1:1887 -broker-sub=tcp://127.0.0.1:1885 -num-clients=1 -num-messages=25
其中-broker-sub 为我自己加的一个配置,专门用于测试夸节点消息同步是否可能会丢消息。
目前测试发现一般情况下-broker-sub 小于20大概率不会有问题,超过20消息必丢。
pub节点日志
image
sub节点日志
image

麻烦大佬帮忙看看是否为我的配置存在问题导致的消息丢失

啥时候支持规则引擎

最近在找支持规则引擎的mqtt项目
目前有的是emqx旗下的3款,最近在考虑ekuiper
不过都是基于sql的.逻辑不太清晰
不知道这个项目的规则引擎准备怎么做。
建议用 https://github.com/hyperjumptech/grule-rule-engine 这类 DSL
如果开始规则引擎代码之后。我计划也添砖加瓦一下,贡献一点想法和代码
PS。我还没学GO。等你们开工了,我跟着一边学GO

TestServerBuildAck fails

It looks like the test is broken. It seems to expect the ack to contain some very specific data. The test will pass when
replacing properties with:

	properties := packets.Properties{}

which I suspect is correct, but I'm not 100% sure.

require.Equal(t, properties, ack.Properties)

No valid packet available

客户端Subscribe的时候,报错:write: No valid packet available; 0

部分代码如下:
server := mqtt.New()
tcp := listeners.NewTCP("t1", ":1883")
err := server.AddListener(tcp, &listeners.Config{
Auth: new(auth.Allow),
})
if err != nil {
log.Fatal(err)
}

err = server.AddStore(bolt.New("mochi-test.db", &bbolt.Options{
	Timeout: 500 * time.Millisecond,
}))
if err != nil {
	log.Fatal(err)
}

server.Events.OnSubscribe = func(filter string, cl events.Client, qos byte, isFirst bool) {
	log.Printf("<< OnSubscribe %s: %s %v  %t\n", cl.ID, filter, qos, isFirst)
}

server.Events.OnError = func(cl events.Client, err error) {
	log.Printf("OnError %s:%s\n", cl.ID, err.Error())
}

输出:
2022/09/27 07:16:39 << OnSubscribe mqttx_1: testtopic/26 0 true
2022/09/27 07:16:39 OnError mqttx_1:write: No valid packet available; 0

Postgres schema

Hey,

Hope you are doing well!
Could you please help me to find the best way to create postgres database/tables.
If it possible may you share a postgres schema?!

Thank you in advance,
Alex

问题咨询

你好。
想咨询下,
通过 auth 后将一部分用户信息进行存储,保存在当前连接的session下,需要怎么做,其他hook也可以获取到该session

Panic with embedded broker

With the broker embedded into my app I get the following panic during testing, both when publishing directly (server.Publish) or using the paho client.

The i.ks list gets filled with *InflightMessage (inflight_map.go:34) but after reading tries to cast to uint16 (inflight_map.go:32)

panic: interface conversion: interface {} is *clients.InflightMessage, not uint16

goroutine 50 [running]:
github.com/wind-c/comqtt/server/internal/clients.(*InflightMap).Set(0xc0007ca060, 0x21c, 0xc0006b1ce0)
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/internal/clients/inflight_map.go:32 +0x2fe
github.com/wind-c/comqtt/server.(*Server).publishToSubscribers(_, {{0x2b4, 0x3, 0x1, 0x0, 0x0}, {0x0, 0x0, 0x0}, {0x0, ...}, ...})
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/server.go:654 +0x498
github.com/wind-c/comqtt/server.(*Server).processPublish(_, _, {{0x2b4, 0x3, 0x1, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/process.go:149 +0x678
github.com/wind-c/comqtt/server.(*Server).processPacket(_, _, {{0x2b4, 0x3, 0x1, 0x0, 0x0}, {0x0, 0x0, 0x0}, ...})
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/process.go:28 +0x138
github.com/wind-c/comqtt/server/internal/clients.(*Client).Read(0xc0007cc000, 0xc0007f7940)
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/internal/clients/clients.go:401 +0x19d
github.com/wind-c/comqtt/server.(*Server).EstablishConnection(0xc000140370, {0xa1d4a4, 0x4}, {0xae60f8, 0xc00033e008}, {0xae2260?, 0xe8b340?})
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/server.go:362 +0x10f2
github.com/wind-c/comqtt/server/listeners.(*TCP).Serve.func1()
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/listeners/tcp.go:113 +0x3d
created by github.com/wind-c/comqtt/server/listeners.(*TCP).Serve
	/home/james/projects/web-mon/server/vendor/github.com/wind-c/comqtt/server/listeners/tcp.go:112 +0xea

The broker is started with

func startMqttBroker() *mqtt.Server {
	server := mqtt.NewServer(nil)
	tcp := listeners.NewTCP("tcp1", ":1883")
	err := server.AddListener(tcp, nil)
	if err != nil {
		log.Fatal(err)
	}
	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()
	return server
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.