Comments (7)
I have solved this problem with this workaround:
- Decompress the payload
- Generate a new
packet.Message
, returnfalse
for current message so it won't be forwarded - Re-send the new
packet.Message
I think there should be an easier way to achieve this.
from gmqtt.
Hmm.. this workaround doesn't work, the resend messages aren't being delivered somehow.
Is there a way to do this?
from gmqtt.
So it turns out resending the message within the OnMsgArrived
hook doesn't work, somehow Publish()
seems to be stuck. I put the message into a channel and handling it from another place where the server instance is accessible, from there Publish
works just fine.
from gmqtt.
The message in OnMsgArrived
is readonly, you cannot modified it in OnMsgArrived
. Publish
should work in OnMsgArrived
, can you show me your codes?
Modify message in OnMsgArrived
seems reasonable. I will check if i can add this feature via OnMsgArrived
.
from gmqtt.
I have a struct defined as follows:
type Plugin struct {
service gmqtt.Server
}
And assigned service
on Load(service gmqtt.Server)
receiver function
func (p Plugin) Load(service gmqtt.Server) error {
p.service = service
return nil
}
Then in OnMsgArrived
func (p Plugin) OnMsgArrived(arrived gmqtt.OnMsgArrived) gmqtt.OnMsgArrived {
return func(ctx context.Context, client gmqtt.Client, msg packets.Message) bool {
if needsDecompression {
// decompress msg.Payload()
b := decompress(msg.Payload())
msg := gmqtt.NewMessage(msg.Topic(), b, msg.Qos())
// this line stucks somehow
p.service.PublishService().Publish(msg)
// return false here so the compressed payload won't be forwarded
return false
}
// forward the decompressed payload
return true
}
}
from gmqtt.
I test with the following code and it works as expected. You should use *Plugin
instead of Plugin
as the method receiver. @maddie
package main
import (
"context"
"log"
"net"
"os"
"os/signal"
"syscall"
"go.uber.org/zap"
"github.com/DrmagicE/gmqtt"
"github.com/DrmagicE/gmqtt/pkg/packets"
)
type Plugin struct {
s gmqtt.Server
}
func (p *Plugin) Load(service gmqtt.Server) error {
p.s = service
return nil
}
func (p *Plugin) Unload() error {
return nil
}
func (p *Plugin) HookWrapper() gmqtt.HookWrapper {
return gmqtt.HookWrapper{
OnMsgArrivedWrapper: func(arrived gmqtt.OnMsgArrived) gmqtt.OnMsgArrived {
return func(ctx context.Context, client gmqtt.Client, msg packets.Message) (valid bool) {
p.s.PublishService().Publish(gmqtt.NewMessage("/abc", []byte("testing"), 1))
return false
}
},
}
}
func (p *Plugin) Name() string {
return "testing"
}
func main() {
// listener
ln, err := net.Listen("tcp", ":1883")
if err != nil {
log.Fatalln(err.Error())
return
}
//l, _ := zap.NewProduction()
l, _ := zap.NewDevelopment()
s := gmqtt.NewServer(
gmqtt.WithTCPListener(ln),
gmqtt.WithLogger(l),
)
p := &Plugin{
s: s,
}
s.Init(gmqtt.WithPlugin(p))
s.Run()
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM)
<-signalCh
s.Stop(context.Background())
}
from gmqtt.
I see! That should be the problem.
Thanks a lot!
from gmqtt.
Related Issues (20)
- Any benchmark test data? HOT 2
- 有没有推荐的golang版本的mqtt客户端和gmqtt一起工作?或者gmqtt有没有计划处一个自己的client库? HOT 1
- Memory leak using OnBasicAuthWrapper Hook HOT 7
- websocket packet size over 1024 causing read hang HOT 1
- grpc publish retain 标志不起作用
- 1000客户端压测报错/gmqtt/persistence/queue/mem/mem.go:185 +0x118 HOT 1
- 三节点的集群,客户端发送消费二进制数据,节点挂掉了 HOT 2
- 可能存在内存泄漏 HOT 2
- panic: persistence factory: memory not found? HOT 1
- Modify topic before publishing
- 关于mqtts的问题 HOT 2
- Alternatives or sucessor of this project
- Old Docker Image
- 有时候存在消息未送达的情况
- 哪里空了啊
- 插件hook函数加载时间点优先于日志
- 测试无法保留会话,总会CleanSession,请问这块有做没有?是否是bug HOT 1
- 如何有重复id登录,是否有选项舍弃之前的id,而不是新的id自动拒绝? HOT 5
- ERROR server/client.go:273 connection lost {"client_id": "", "remote_addr": "113.89.11.200:21300", "error": "operation error: Code = 81, reasonString: "} HOT 1
- mqtt5协议的客户端无法获取clientId?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from gmqtt.