Giter Site home page Giter Site logo

Comments (11)

tekjar avatar tekjar commented on May 9, 2024

I think I understand how to handle the async stream by running tokio in a thread

You don't have to worry about tokio as there will be a higher level sync client (in the next release)

but the problem is the notifications - I need to crossbeam_channel::select! over multiple channels, one of them being the rumqtt notifications channel

I'm not sure what to do here. Rust's synchronous channels are broken ecosystem wise as they are not standardized by traits. The moment I choose one channel library as dependency, sitting in other channels select is difficult (someone else might have chosen flume channels).

But I agree that choosing crossbeam channel is a safer bet as it has become defacto in synchronous world. I'll probably use it for highlevel client

from rumqtt.

MightyPork avatar MightyPork commented on May 9, 2024

Sync client sounds great, looking forward to it. I must admit being intimidated by async / tokio was the main reason I used the deprecated rumqtt library to begin with..

from rumqtt.

tekjar avatar tekjar commented on May 9, 2024

Now there are no notification channels. Just an iterator

from rumqtt.

MightyPork avatar MightyPork commented on May 9, 2024

That's unfortunate, but my original idea of new thread resending messages to a channel will still work. I admit the new code is quite elegant. ... time to try replacing rumqtt with this at last

from rumqtt.

tekjar avatar tekjar commented on May 9, 2024

@MightyPork I think you can keep your current design

    // Iterate to poll the eventloop for connection progress
    for (i, notification) in connection.iter().enumerate() {
        if let Some(notification) = notification {
            crossbeam_channel.send(notification)
        }
    }

Is there a problem with this?

from rumqtt.

MightyPork avatar MightyPork commented on May 9, 2024

Creating that extra thread is not a huge deal, maybe my use case is unusual.

The design is like this:

/// Run (separated from start() so we can use 'self' and the borrow checker is less confused)
fn run(&mut self, shutdown_req : crossbeam_channel::Receiver<()>) -> anyhow::Result<()> {
    let fmq_notifs = self.inner.fmq.lock().notifications()?; // Get the notif channel from RUMQTT

    'recv: loop {
        /* manually construct a Select */

        let mut select = Select::new();

        let fmq_index = select.recv(&fmq_notifs); // <-- this is RUMQTT event channel

        let shutdown_index = select.recv(&shutdown_req);

        let user_index = select.recv(&self.user_events_recv)

        let delay = self.inner.event_scheduler.closest_deadline()
            .unwrap_or(Duration::from_secs(1440));
        let timeout_ch = Some(crossbeam_channel::after(delay));
        let timeout_index = select.recv(timeout_ch.as_ref().unwrap());

        // Select!
        match select.select() {
            op if op.index() == shutdown_index => {
                // ...
            }

            op if op.index() == timeout_index => {
                // ...
            }

            op if op.index() == user_index => {
                // ...
            }

            op if op.index() == fmq_index => {
                trace!("{} FMQ event", self.inner.name);

                match op.recv(&fmq_notifs) {
                    Ok(_) => {
                        // ...
                    },
                    Err(_) => {
                        error!("{} MQTT notification queue is dead!", self.inner.name);
                        // Client crashed
                        break 'recv;
                    }
                }
            }
            _ => unreachable!(),
        };
    }

    info!("{} shuts down...", self.inner.name);
    Ok(())
}

from rumqtt.

MightyPork avatar MightyPork commented on May 9, 2024

Yeah, looking at it, I already have a separate thread there - it's detects the Re-connect RUMQTT event and re-subscribes to my topics. Is that still needed, or it happens automatically in Rumq?

from rumqtt.

MightyPork avatar MightyPork commented on May 9, 2024

One problem I see with the new iterator API is that I can't wait for an item with timeout. timeout-iterator can solve that - by internally spawning that resender thread and creating a channel 😂

from rumqtt.

tekjar avatar tekjar commented on May 9, 2024

Timeout and selects aren't easy with sync rust. Previously rumqtt spawns internal thread and gives you handle of crossbeam channel rx. Now you should spawn this in a thread. There is no difference regarding number of threads

    thread::spawn(move || {
        // Iterate to poll the eventloop for connection progress
        for (i, notification) in connection.iter().enumerate() {
            if let Some(notification) = notification {
                crossbeam_channel.send(notification)
            }
        }
    })

Timeout's are easy with async though. But that depends on your comfort levels with async though. Barrier of entry is higher when compared to sync

from rumqtt.

MightyPork avatar MightyPork commented on May 9, 2024

yeah I see that the thread is unavoidable. What about the re-connects, I still have to send Subscribe again after it re-connects, right?

from rumqtt.

tekjar avatar tekjar commented on May 9, 2024

Yeah

from rumqtt.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.