Comments (11)
I think I understand how to handle the async stream by running tokio in a thread
You don't have to worry about tokio as there will be a higher level sync client (in the next release)
but the problem is the notifications - I need to
crossbeam_channel::select!
over multiple channels, one of them being the rumqtt notifications channel
I'm not sure what to do here. Rust's synchronous channels are broken ecosystem wise as they are not standardized by traits. The moment I choose one channel library as dependency, sitting in other channels select is difficult (someone else might have chosen flume channels).
But I agree that choosing crossbeam channel is a safer bet as it has become defacto in synchronous world. I'll probably use it for highlevel client
from rumqtt.
Sync client sounds great, looking forward to it. I must admit being intimidated by async / tokio was the main reason I used the deprecated rumqtt library to begin with..
from rumqtt.
Now there are no notification channels. Just an iterator
from rumqtt.
That's unfortunate, but my original idea of new thread resending messages to a channel will still work. I admit the new code is quite elegant. ... time to try replacing rumqtt with this at last
from rumqtt.
@MightyPork I think you can keep your current design
// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
if let Some(notification) = notification {
crossbeam_channel.send(notification)
}
}
Is there a problem with this?
from rumqtt.
Creating that extra thread is not a huge deal, maybe my use case is unusual.
The design is like this:
/// Run (separated from start() so we can use 'self' and the borrow checker is less confused)
fn run(&mut self, shutdown_req : crossbeam_channel::Receiver<()>) -> anyhow::Result<()> {
let fmq_notifs = self.inner.fmq.lock().notifications()?; // Get the notif channel from RUMQTT
'recv: loop {
/* manually construct a Select */
let mut select = Select::new();
let fmq_index = select.recv(&fmq_notifs); // <-- this is RUMQTT event channel
let shutdown_index = select.recv(&shutdown_req);
let user_index = select.recv(&self.user_events_recv)
let delay = self.inner.event_scheduler.closest_deadline()
.unwrap_or(Duration::from_secs(1440));
let timeout_ch = Some(crossbeam_channel::after(delay));
let timeout_index = select.recv(timeout_ch.as_ref().unwrap());
// Select!
match select.select() {
op if op.index() == shutdown_index => {
// ...
}
op if op.index() == timeout_index => {
// ...
}
op if op.index() == user_index => {
// ...
}
op if op.index() == fmq_index => {
trace!("{} FMQ event", self.inner.name);
match op.recv(&fmq_notifs) {
Ok(_) => {
// ...
},
Err(_) => {
error!("{} MQTT notification queue is dead!", self.inner.name);
// Client crashed
break 'recv;
}
}
}
_ => unreachable!(),
};
}
info!("{} shuts down...", self.inner.name);
Ok(())
}
from rumqtt.
Yeah, looking at it, I already have a separate thread there - it's detects the Re-connect RUMQTT event and re-subscribes to my topics. Is that still needed, or it happens automatically in Rumq?
from rumqtt.
One problem I see with the new iterator API is that I can't wait for an item with timeout. timeout-iterator can solve that - by internally spawning that resender thread and creating a channel 😂
from rumqtt.
Timeout and selects aren't easy with sync rust. Previously rumqtt spawns internal thread and gives you handle of crossbeam channel rx. Now you should spawn this in a thread. There is no difference regarding number of threads
thread::spawn(move || {
// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
if let Some(notification) = notification {
crossbeam_channel.send(notification)
}
}
})
Timeout's are easy with async though. But that depends on your comfort levels with async though. Barrier of entry is higher when compared to sync
from rumqtt.
yeah I see that the thread is unavoidable. What about the re-connects, I still have to send Subscribe again after it re-connects, right?
from rumqtt.
Yeah
from rumqtt.
Related Issues (20)
- Password authentication is not constant-time HOT 5
- rumqttd: external auth should be async
- An Error = MqttState(AwaitPingResp) error occurred when creating a connection using rumqttc v5 HOT 3
- RFC(rumqttc): `publish` / `subscribe` / `unsubscribe` methods return a promise that resolves into pkid when packet is handled by `Eventloop` HOT 18
- [rumqttc] - Using websocket feature makes EventLoop not Send / can't be spawned with tokio::spawn HOT 4
- [rumqttc] MqttOptions::parse_url broken for websocket urls HOT 1
- [rumqttd] v0.19 not published to docker hub HOT 2
- rumqttc: Outgoing publications are sent one by one HOT 11
- rumqttc: Throttling of pending requests can cause starvation on the request side HOT 6
- rumqttc: `ConnectionError::RequestsDone` is unreachable HOT 3
- rumqttc: value returned by `v4::Connect.write` doesn't match reality HOT 4
- Non-blocking receive never notifies HOT 3
- rumqttc panics if network connection is closed by broker
- rumqttc: getting incorrect unsolicited acks error logs
- macOS ci is broken HOT 2
- rumqttc - AUTH packet support HOT 10
- rumqttc - ack with reason code HOT 3
- rumqttc: Could publish/subscribe methods directly return the msg id? HOT 2
- Does `MqttOptions` `request_channel_capacity` field do anything? HOT 3
- Keep the ACK ordering when using manual ACK
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rumqtt.