Comments (6)
Loved the Codec implementation, let's start by implementing that in the first PR? I'll take this up if you are ok with it. We can go about this by dividing up the work.
from rumqtt.
Hey, please check if i understood this correctly:
- if incoming packet comes before the pending throttle ( in case if there were some pending requests ), the
next_request
future will be cancelled.- if the incoming packet keeps coming at higher frequency, it might not get chance to complete the
next_request
, thus starve. ( i mentioned packet not frame becausereadb
is resolved only when whole packet is read ).
Exactly.
my thoughts / questions:
pending_throttle
is0ms
by default and is configurable.- correct me if wrong, if we give priority to pending requests, that would mean we are starving on incoming packets right?
Both - the rx
and the sleep
- are polled in the select!
. So we cannot starve on the rx path but incoming packets can lead (as you wrote above) to starvation on the pending requests. Only the requests in the VecDec
are affected - not the requests from the channel.
A scenario where this can happen is when there are pending requests and a configured throttle. The client connects with clean_session
false
and instantly gets publications from the broker.
due to this, and my limited context, i think this might be a trade-off rather than an issue. If you think otherwise, can you please elaborate the context more?
Think it's just a bug in the implementation. The poll
fn doesn't store any context regarding the throttle. The sleep delay is always started from scratch. To solve this I'd create a Stream
of Requests
and like this and hold that stream in EventLoop
. The code behind the link has the same bug as main
. The throttled stream is created newly upon each call of poll
.
Getting that stream in EventLoop
is probably hard because of lifetime issues. To tackle this an option could be to spawn a task that loops on poll
and communicates the events via channel. The channel is fed directly from State
. This would also avoid the nasty Ok(self.state.events.pop_front().unwrap())
that feel odd in current implementation.
Such a task would also allow to interleave rx and tx similar to this. This all would be a rather big refactoring with a lot of room for errors.
from rumqtt.
Hey, please check if i understood this correctly:
- if incoming packet comes before the pending throttle ( in case if there were some pending requests ), the
next_request
future will be cancelled. - if the incoming packet keeps coming at higher frequency, it might not get chance to complete the
next_request
, thus starve. ( i mentioned packet not frame becausereadb
is resolved only when whole packet is read ).
my thoughts / questions:
pending_throttle
is0ms
by default and is configurable.- correct me if wrong, if we give priority to pending requests, that would mean we are starving on incoming packets right?
due to this, and my limited context, i think this might be a trade-off rather than an issue. If you think otherwise, can you please elaborate the context more?
thank you so much!
from rumqtt.
Loved the Codec implementation, let's start by implementing that in the first PR? I'll take this up if you are ok with it. We can go about this by dividing up the work.
I'm absolutely fine with that but think this is a general and architectural decision which must be approved within the rumqtt team.
Some notes:
The branch is probably a good starting point but needs something more:
- Ensure that enqueued packets are not discarded when the connection is reestablished. Framed::into_parts gives access to the buffers. Alternatively reuse the
Framed
instance fromNetwork
. - Solving the issue described above is probably more invasive. The stream of pending requests must be stored within
EventLoop
in order to retain the throttling across calls topoll
as described above. - Moving the loop into a task would require even more: Introduce a channel for the events (best use a
Sink
interface with explicit flushing to get less task switches).poll
would just asynchronically receive on the channel (Even better: implementStream
forEventLoop
).
from rumqtt.
I'm absolutely fine with that but think this is a general and architectural decision which must be approved within the rumqtt team.
Noted, we will discuss this and get back to you.
Alternatively reuse the Framed instance from Network.
That sounds like a great option, we could just as well re-establish a connection and use the buffers as it was from the previous connection, dropping corrupted/incomplete packet bytes.
Moving the loop into a task would require even more: Introduce a channel for the events (best use a Sink interface with explicit flushing to get less task switches). poll would just asynchronically receive on the channel (Even better: implement Stream for EventLoop).
Forgive me if my understanding is naive, but I believe it is better to not spawn a separate task and use something like a timer that doesn't get cancelled to handle this, we could ignore it and set it to never when nothing is pending?
from rumqtt.
Alternatively reuse the Framed instance from Network.
That sounds like a great option, we could just as well re-establish a connection and use the buffers as it was from the previous connection, dropping corrupted/incomplete packet bytes.
Hm. The tx
buffer should be fine to keep in total because it's flushed. The rx
buffer is likely to contain incomplete packets. I don't know the MQTT standard by heart but probably it would be fine the process the complete frames and drop the possible last incomplete. The implementation today discards an incomplete received frame upon network errors. Nothing you can do here...
Moving the loop into a task would require even more: Introduce a channel for the events (best use a Sink interface with explicit flushing to get less task switches). poll would just asynchronically receive on the channel (Even better: implement Stream for EventLoop).
Forgive me if my understanding is naive, but I believe it is better to not spawn a separate task and use something like a timer that doesn't get cancelled to handle this, we could ignore it and set it to never when nothing is pending?
Also possible and less invasive. This would be another select!
branch. The request_rx
branch is guarded with self.pending.is_empty()
.
from rumqtt.
Related Issues (20)
- rumqttc panics if network connection is closed by broker
- rumqttc: getting incorrect unsolicited acks error logs
- macOS ci is broken HOT 2
- rumqttc - AUTH packet support HOT 13
- rumqttc - ack with reason code HOT 3
- rumqttc: Could publish/subscribe methods directly return the msg id? HOT 2
- Does `MqttOptions` `request_channel_capacity` field do anything? HOT 3
- Keep the ACK ordering when using manual ACK
- Unexpected PUBACK Packet Sent to Reconnected Broker HOT 1
- rumqttc - How to connect to Mosquitto test broker with TLS HOT 2
- Connect with session_expiry_interval for MQTT v5 HOT 3
- client: how to reconnect with subscriptions? HOT 1
- quic protocol HOT 3
- rumqttc - Event ordering of puback is incorrect
- In mqttc v5, restore session only if session is resumed HOT 1
- InvalidCertificate(BadSignature) when using TLS with custom root certificate HOT 1
- Add customization options when using native-tls
- rumqttc: Reduce memory usage of MqttState
- Option to set the `tcp-nodelay` flag used for the connection from the MQTT client to the broker
- rumqttc: log `Outgoing::Connect` HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rumqtt.