Giter Site home page Giter Site logo

bytebeamio / rumqtt Goto Github PK

View Code? Open in Web Editor NEW
1.5K 19.0 215.0 2.94 MB

The MQTT ecosystem in rust

License: Apache License 2.0

Rust 98.77% Shell 0.14% Go 0.55% Dockerfile 0.03% JavaScript 0.29% HTML 0.23%
mqtt rust asyncio emqx iot tokio hivemq mqtt-ecosystem

rumqtt's Introduction

rumqtt Logo

What is rumqtt?

rumqtt is an opensource set of libraries written in rust-lang to implement the MQTT standard while striving to be simple, robust and performant.

Crate Description version
rumqttc A high level, easy to use mqtt client crates.io page
rumqttd A high performance, embeddable MQTT broker crates.io page

Contents

Installation and Usage

rumqttd

Run using docker

rumqttd can be used with docker by pulling the image from docker hub as follows:

docker pull bytebeamio/rumqttd

To run rumqttd docker image you can simply run:

docker run -p 1883:1883 -p 1884:1884 -it bytebeamio/rumqttd

Or you can run rumqttd with the custom config file by mounting the file and passing it as argument:

docker run -p 1883:1883 -p 1884:1884 -v /absolute/path/to/rumqttd.toml:/rumqttd.toml -it rumqttd -c /rumqttd.toml

Prebuilt binaries

For prebuilt binaries checkout our releases, download suitable binary for your system and move it to any directory in your PATH.


Install using cargo

cargo install --git https://github.com/bytebeamio/rumqtt rumqttd

download the demo config file

curl --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/bytebeamio/rumqtt/main/rumqttd/rumqttd.toml > rumqttd.toml

and run the broker using

rumqttd --config rumqttd.toml

Note: Make sure to you correct rumqttd.toml file for a specific version of rumqttd


Install using AUR

paru -S rumqttd-bin

replace paru with whatever AUR helper you are using.

Note: Configuration is found in /etc/rumqtt/config.toml and systemd service name rumqtt.service


Compile from source

Clone the repo using git clone.

git clone --depth=1 https://github.com/bytebeamio/rumqtt/

Change directory to that folder and run

cd rumqtt
cargo run --release --bin rumqttd -- -c rumqttd/rumqttd.toml -vvv

for more information look at rumqttd's README

rumqttc

Add rumqttc to your project using

cargo add rumqttc

for more information look at rumqttc's README

Features

rumqttd

  • MQTT 3.1.1
  • QoS 0 and 1
  • Connection via TLS
  • Retransmission after reconnect
  • Last will
  • Retained messages
  • QoS 2
  • MQTT 5

rumqttc

  • MQTT 3.1.1
  • MQTT 5

Community

Contributing

Please follow the code of conduct while opening issues to report bugs or before you contribute fixes, also do read our contributor guide to get a better idea of what we'd appreciate and what we won't.

License

This project is released under The Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0)

rumqtt's People

Contributors

123vivekr avatar abhikjain360 avatar amokfa avatar amrx101 avatar arunanshub avatar brianmay avatar carlocorradini avatar de-sh avatar dependabot[bot] avatar edjopato avatar flxo avatar gautambt avatar henil avatar initerworker avatar jaredwolff avatar jplatte avatar mnpw avatar nathansizemore avatar nickztar avatar qm3ster avatar qwandor avatar railwaycat avatar ravenger avatar skreborn avatar swanandx avatar tekjar avatar therealprof avatar thoralf-m avatar wiktorsikora avatar yatinmaan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rumqtt's Issues

crossbeam_channel option for notifications / migrating from rumqtt

Hi, I'm trying to migrate a client application from rumqtt (that I keep in a local copy, manually updating its dependencies - but I hit a wall today with the "ring" dependency of rustls / webpki).

I think I understand how to handle the async stream by running tokio in a thread, but the problem is the notifications - I need to crossbeam_channel::select! over multiple channels, one of them being the rumqtt notifications channel.

So far, the only solution I came up with is to run a thread that receives messages from the tokio channel and re-sends them to a crossbeam channel, but it seems dirty. What do you think?

(I really don't want to rewrite everything using tokio::sync::mpsc)

Fix perf regression in client

There is a perf degradation in commit 07614a5

Id = rumqtt-0
            Outgoing publishes : Received = 1000000 Throughput = 330141.97 messages/s
            Incoming publishes : Received = 0       Throughput = NaN messages/s
            Reconnects         : 0

vs

commit c099f42

Id = rumqtt-0
            Outgoing publishes : Received = 1000000 Throughput = 674308.8 messages/s
            Incoming publishes : Received = 0       Throughput = NaN messages/s
            Reconnects         : 0

I'm using mqttwrk to test this

Steps to reproduce

  • cd rumqttd && cargo run --release
  • cd mqttwrk && cargo run --release

Subscribe with certificates

I am trying to subscribe to a topic with certificates but I am getting this error:
Err(Network(Io(Custom { kind: InvalidData, error: WebPKIError(BadDER) })))
I am not sure I am configuring correctly the MqttOptions. Here is what I did:

let ca: Vec<u8> = fs::read("/etc/mosquitto/certs/mqtt-ca.crt").expect("Something went wrong reading certificate!");
let mut mqttoptions = MqttOptions::new("air", "mysite.com", 8884);
mqttoptions.set_ca(ca);
mqttoptions.set_credentials("my_username", "my_password");
mqttoptions
	.set_keep_alive(5)
	.set_throttle(Duration::from_secs(2));

let (mut client, mut connection) = Client::new(mqttoptions, 10);
client.subscribe("read/+/meas", QoS::ExactlyOnce).unwrap();

// Iterate to poll the eventloop for connection progress
for (i, notification) in connection.iter().enumerate() {
	println!("Notification {} = {:?}", i, notification);
}

For reference, the mosquitto command I am using and which is working.
mosquitto_sub --cafile /etc/mosquitto/certs/mqtt-ca.crt -h mysite.com -p 8884 -u 'my_username' -P 'my_password' -t 'read/+/error' -v

As I am also beginning with Rust, I am not sure the ca should be provided as I did.

Any idea what I could do wrong?

TLS works on OSX, fails on yocto linux

Expected behaviour

Connecting to an MQTT broker via TLS works on all platforms

Actual behaviour

Succeeds on Darwin:

2020-08-20 10:01:56.213 - DEBUG - Received a message from MQTT broker: (Some(Connected), None)

Fails on Yocto linux:

2020-08-20 11:53:00.441 - ERROR - Connection error: Network(Io(Custom { kind: InvalidData, error: WebPKIError(UnknownIssuer) }))

It's the exact same code, where we read the CA if present:

let ca = fs::read(&mqttpub_config.ca_pem_path).await.map_err(|e| {
    let error = format!(
        "Could not read CA file \"{}\": {}",
        mqttpub_config.ca_pem_path, e
    );
    io::Error::new(io::ErrorKind::Other, error)
})?;
mqttoptions.set_ca(ca);

Further investigation

I observed that on both platforms, rustls is using different TLSv1.3 encrypted extensions:

# OSX
2020-08-20 10:01:56.123 - DEBUG - rustls::client::tls13 - TLS1.3 encrypted extensions: []
# Linux / yocto
2020-08-20 10:01:56.123 - DEBUG - rustls::client::tls13 - TLS1.3 encrypted extensions: [ServerNameAck]

Keep alive ping while connection is still active

Something I tested, that affects both rumqttc and previous rumq-client crates is that pinging the broker is happening while the data connection channel is still being used.

From the documentation, the set_keep_alive method defines:

Set number of seconds after which client should ping the broker if there is no other data exchange

But from testing, pinging is happening while subscribed, receiving data, and while publishing.
Found this because the application errored when not receiving ping response, probably due to limiting in the amount of messages the broker was handling for the client.

As a temporary measure, I just listen to eventloop and match with the error:

match eventloop.poll().await {
  Ok(mut notification) => {},
  Err(e) => { error!("{:?}", e); }
}

and I increased the keep_alive value so it pings less frequently.

Reconnection issue

Hi, I'm in the process of migrating from the rumqtt client to this library and running into reconnection problems during testing. When I disconnect the client from the network the broker is on I get the following error:

{
  "file": "/build/.cargo/registry/src/github.com-1ecc6299db9ec823/rumq-client-0.1.0-alpha.4/src/state.rs",
  "level": "ERROR",
  "line": 263,
  "message": "Error awaiting for last ping response",
  "module_path": "rumq_client::state",
  "target": "rumq_client::state",
  "time": "2020-02-12T14:30:57.027601409+00:00"
}

However when I reconnect the client to the network I continue getting the same error and the client doesn't reconnect. Based on the logs I'm not receiving a Reconnection or Disconnection notification, but with each error I am getting a StreamEnd:

{
  "file": "app/src/mqtt/mod.rs",
  "level": "DEBUG",
  "line": 551,
  "message": "notification: StreamEnd(MqttState(AwaitPingResp))",
  "module_path": "app::mqtt",
  "target": "app::mqtt",
  "time": "2020-02-12T15:28:10.837594182+00:00"
}

In case I've done something wrong here is my code for setting up the client options.

    fn rumq_options(&self) -> rumq_client::MqttOptions {
        let mut options = rumq_client::MqttOptions::new(&self.client_id, &self.host, self.port);
        options
            .set_keep_alive(10)
            .set_throttle(Duration::from_secs(1))
            .set_clean_session(false)
            .set_request_channel_capacity(128)
            .set_notification_channel_capacity(128);

        if let Some(user_password) = &self.user_password {
            options.set_credentials(&user_password.user, &user_password.password);
        }
        if let Some(tls) = &self.tls {
            options.set_ca(tls.ca_pem.clone());
            // mqttoptions.set_client_auth(client_cert, client_key);
        }
        options
    }

And my code for spawning the tasks for sending/receiving.

    /// Start asynchronous loops to handle events.
    pub async fn start(self) -> (task::JoinHandle<()>, task::JoinHandle<()>) {
        let mqtt = Arc::new(self);

        let (mut requests_tx, requests_rx) = channel(10);
        let mut eventloop = eventloop(mqtt.options.rumq_options(), requests_rx);

        mqtt.metric_connected().set(1);
        mqtt.rumq_subscribe(&mut requests_tx).await;

        let mqtt1 = mqtt.clone();
        let mut requests_tx1 = requests_tx.clone();
        let poll_task = task::spawn(async move {
            loop {
                let mqtt = mqtt1.clone();
                mqtt.poll_handler(&mut requests_tx1).await;
                time::delay_for(Duration::from_secs(1)).await;
            }
        });

        let notification_task = task::spawn(async move {
            loop {
                let mqtt = mqtt.clone();
                let mut stream = eventloop.stream();
                while let Some(item) = stream.next().await {
                    mqtt.notification_handler(item, &mut requests_tx).await;
                }
                time::delay_for(Duration::from_secs(1)).await;
            }
        });

        (poll_task, notification_task)
    }

If there's any other information I can provide let me know. Thanks for the help and the great library, migrating was much easier than I expected and the new interface works very well with my other async code 👍

rumqttc "Stream done" after 70 minutes, no messages on reconnect?

Hi there.

I am trying to run the following code (actually taken from the syncpubsub example), just wrapped in a loop (in an attempt reconnect).

I am using rumqttc "0.0.4" and used "cross" to compile on MacOS (via Rust 1.43.1) for ARM, testing on a Rasperry Pi 4.
The code runs fine for a while, consuming and processing MQTT messages, however after about 70 minutes the invoke value resolves in an Error("Stream done"). I tried to wrap the connection.iter in a loop hoping it would reconnect, which seems like it worked. However it does not receive messages from the subscribed topic anymore.

Any idea what I am doing wrong? btw. you can find the whole code here

Snippet:

let (mut client, mut connection) = Client::new(mqtt_options, 10);

 client.subscribe(event_topic.as_str(), QoS::AtMostOnce)
            .expect("Failed to subscribe to mqtt event topic");

        loop {

          
            for (_i, invoke) in connection.iter().enumerate() {

                  match invoke {
                    Err(e) => {
                        error!("mqtt error {}", e);
                        continue;
                    },
                    _ => ()
                };

             
                let (inc, _out) = invoke.unwrap();
                if inc.is_none() {
                    continue;
                }

                match inc.unwrap() {
                    Incoming::Publish(message) => {

                        let payload = str::from_utf8(&message.payload);
                        if payload.is_err() {
                            error!("Failed to decode mqtt payload {:?}", payload);
                            continue;
                        }
                        let payload = payload.unwrap();

                      /* some business logic here.. */
                    },
                    _ => continue
                }
            }

        }

thx

Compilation error

When using rumq-client (version alpha.5) in Cargo.toml, it fails to compile due to a bunch of missing functions such as time::throttle and in tokio...

Environment: Windows 10
Toolchain: stable-x86_64-pc-windows-msvc

Is this a rewrite of rumqtt?

We started using Rumqtt at work last year, and now I see that is has been archived and ownership is transferred here. Since this is in alpha state, is it a rewrite?

rustfmt everything

My editor is set to automatically run rustfmt on save. However, the rumqtt code hasn't been run through rustfmt before, so this always generates a bunch of spurious changes which I have to revert. How would people feel about running rustfmt over everything, to avoid this problem in future?

Alternatively, it might be possible to add a rustfmt.toml which disables it.

Option to set `MTU`.

We had an issue wherein the network we were trying to talk to had a mtu set to a value less than the payload we were trying to send.

Openssl, Golang set the MTU to minimum threshold whereas RUSTLS sets mtu to the maximum possible value in the spec. RUSTLS however exposes a method to set mtu. We need to utilize this and have an option to set mtu that percolates down to RUSTLS.

References

Expose `Rustls`

We can have very many config options for Rustls. Its not practical to use Mqttoptions for all configs related to Rustls. A more pragmatic option would be that the use creates his|her own ClientConfig with all the desired configs.

eventloop.poll() inside async function/spawned task cases panic

Summary

When using eventloop.poll() inside of async function/spawned task it causes the program to panic. The error is as follows:

thread 'tokio-runtime-worker' panicked at 'removal index (is 0) should be < len (is 0)', src/liballoc/vec.rs:1057:13

What's expected

The thread should start executing as expected.

Steps to reproduce

Put the async example for the client inside of an async function and spawn it from main

The function

   // .. do stuff here to get CA certs

   // Create the options for the Mqtt client
    let mut opt = MqttOptions::new("server", host, port.parse::<u16>().unwrap());
    opt.set_keep_alive(5);
    opt.set_ca(ca_cert_buf);
    opt.set_client_auth(server_cert_buf, private_key_buf);

    let mut eventloop = EventLoop::new(opt, 10).await;
    let tx = eventloop.handle();

   // .. spawn loop function for communicating with separate message broker..
   // .. (tx used here)

    loop {
        if let Ok((incoming, outgoing)) = eventloop.poll().await {
            println!("Incoming = {:?}, Outgoing = {:?}", incoming, outgoing);
        };
    }

In main itself:

let task = task::spawn(mqtt_run());

Version info

Latest commit f94e554d95

impl Error for EventLoopError

Just saw that EventLoopError doesn't implement std::error::Error. Is this a known limitation or something I can easily add?

native tls support

rustls still has some limitations, some old certificates are not supported.

Do you have a plan to add native-tls as a feature?

How to get message content?

I have successfully subscribed to a topic and received a message, but I can't figure out how to get the content of the message?

Documentation or example

Hi
There is any example or documentation for how to setup a mqtt broker with authentication handling using rumq-broker

Please transfer ownership of old rumqtt repo

Hi,

Thank you for your hard work. I still have projects using the old rumqtt and would like to (very) passively maintain it (e.g. bump some dependencies, apply a few trivial merge requests).
Would it be possible for you to transfer ownership of the github repo and the crate to me ?

Renaming and package structure plan

I'm planning to rename this repo to rumqtt. I've waited long enough for my previous employer to respond regarding ownership transfer. I thought to migrate the code when that happens but I'm just going to rename.

Coming to the structure, this is what I've in mind (mostly copied from tokio)

rumqtt
- mqtt4bytes # bare bones no std compatible mqtt 4 serialization and deserialization
- mqtt5bytes # bare bones no std compatible mqtt 5 serialization and deserialization
- rumqttc # client library (alone with high level APIs)
- rumqttd # embeddable broker library
- rumqtt # wraps rumqttc and rumqttd with feature flags

Additionally a rumqtt binary which can do what mosquitto command line utility does + more

rumqtt client {options)
rumqtt broker {options}
rumqtt test 

mqtt4bytes and mqtt5bytes versions will increase independently while rumqttc, rumqttd and rumqtt will be lock-stepped

Also a (rate limited) public broker will be hosted at broker.rumqtt.com

Current design and notes for high level client

I'll use this issue to articulate my thoughts around current design and how the high level client should look like to make the most out of the client with little effort. This issue won't be closed

The current async API looks like this

let (requests_tx, requests_rx) = channel(10);
let mut eventloop = eventloop(mqttoptions, requests_rx);

// connect and poll
let mut stream = eventloop.connect().await.unwrap();
while let Some(item) = stream.next().await {
    println!("Received = {:?}", item);
}

This pattern offers a lot of flexibility

eventloop() takes request Stream

Stream being a trait allows library to not make any assumptions about request pattern. E.g

  • streams can be both bounded and unbounded
  • plug in a disk backed Stream for robustness across reboots
  • attach a stream which select!s internally to tackle request priorities

connect returns a Stream

This stream does everything MQTT. A poll on this stream will do the following internally

  • Take user request, handle state w.r.t this user request, create bytes and put it on network
  • Ping based on incoming and outgoing activity. Make sure that the broker doesn't disconnect
  • Supervision to detect half-open connections and disconnect
  • Flow control based on in-flight messages to not blowup ram usage (very much possible during slow incoming acks and time between half-open detection)
  • Read incoming packet and respond back to the broker
  • Yield incoming packets to the user

NOTE: These are necessary for a robust, persistent mqtt connection. I've seen a lot of implementations ignoring these only to get bitten at some point of time. Even if you are using different clients from other languages, keep these in mind

Now this Stream can be plugged into async/sync code as they see fit

state separated from user Stream

But the important thing here is that mqtt state is separated from the stream. It takes state as a reference. This allows users to freely create and destroy connections and resume from where we left. This makes reconnections simple. Just wrap the above code in a loop

loop {
    let mut stream = eventloop.connect().await.unwrap();
    while let Some(item) = stream.next().await {
        println!("Received = {:?}", item);
    }
}

Again, how client should reconnect (10 times or infinite) is left to the user.

high level client

All these features are great if you've sophisticated uses but not every one has to understand all the details when they start. They can reach this stage of customization eventually. When they do, the solution is with in the reach.

Getting started with the library should be fun and easy. They don't have to know that they should poll the eventloop Stream with tokio, read about async rust or why there are 3 (and not 1) steps to start the eventloop

rough sketch of client APIs

This spawns a background thread where eventloop runs. tx is used send mqtt requests and rx is
used to receive notifications. Just like a channel

let (tx, rx) = rumq_client::spawn_eventloop(options)

And this if eventloop should run on current thread

let eventloop = rumq_client::eventloop(options);
let (tx, rx) = eventloop.handles();
eventloop.run()

Weird TLS error when payload size is large.

  • Replicated on 2 machines with 2 different brokers(Mosquito, emqx).
  • When payload size is increased to 4096 this can be replicated.

Set up description:

  1. A broker with 2way TLS enabled.
  2. Rumqttc client with ca-chain and client-auth.

First start off with any small payload size. Try with payload size >= 4096, we will have error

ERROR rustls::session > TLS alert received: Message { typ: Alert, version: TLSv1_3, payload: Alert( AlertMessagePayload { level: Fatal, description: InternalError, }, ), }

Sample code sample for client configuration

 fn set_tls(mqttoptions: &mut MqttOptions, ca_file: Option<String>, client_file: Option<String>, client_key: Option<String>, use_ssl: i16) {
    let mut ca_data: Vec<u8> = Vec::new();
    let mut cert_data = Vec::new();
    let mut key_data = Vec::new();

    // Unwrap or handle?
    match ca_file {
       Some(file_path) => {
           let mut ca_file = File::open(file_path).unwrap();
           ca_file.read_to_end(&mut ca_data).unwrap();
        
       },
       None => println!("No ca file provided"),
    }
   mqttoptions.set_ca(ca_data);

   match client_file {
       Some(file_path) => {
           let mut cert_file = File::open(file_path).unwrap();
           cert_file.read_to_end(&mut cert_data).unwrap();
       },
       None => println!("No client_cert privided"),
   };
   match client_key {
       Some(file_path) => {
          let mut key_file = File::open(file_path).unwrap();
          key_file.read_to_end(&mut key_data).unwrap();
     },
     None => println!("No client key provided"),
  };
  if use_ssl == 2 {
     mqttoptions.set_client_auth(cert_data, key_data);
 }

}

mqttwrk can also be used(please use the forked one mentioned in the link, as that doesnt unwrap on event loop poll.

./target/debug/mqttwrk -s <server_addr> -P <server_port> -t 2 -R <path to Ca-Chain> -C <path to client cert> -K <path to client key> -c 1 -p 4096.

Function and method names should be more self-declaring

Example:

rumq_client::publish does not publish anything, it creates a Publish object that would later have to be sent to the eventstream. same with rumq_client::subscribe

It would make more sense to have them as Publish::new(...) and Subscribe::new(..), or just rename them into create_publish, and create_subscribe

There are numerous such issues that should be addressed before a 1.0 release, to make it easier for newcomers to navigate the library. Lets collect those issues here and discuss them.

LastWill message is String instead of Vec<u8>

I am creating a Sparkplug™ B client as my first big Rust project. The Sparkplug™ B specification requires LastWill message be a binary Protocol Buffers type. But the type on rumqttc LastWill is String.

Is there any other way to create the connection using a Vec<u8> or Bytes or something of that nature as the data type for the message field of LastWill struct?

Websocket support?

Is it possible to use websockets (both secure and not) with this?

Could you document an example if so?
Thanks for your work!

v1 release

Stuff to do before 1.0 release on May 1st week

mqtt4bytes

  • Check if can completely remove allocations for no std
  • Finalize APIs

rumqttc

  • High level channel like interface for client (spawns a thread but hides tokio)
  • Finalize APIs

rumqttd

  • A very basic broker which can be embedded
  • Commit log finalization
  • Clustered mode tested on 3 nodes

commandline

  • Some mosquitto like options
  • Brew package

Related to #64

Experiment with concurrent writes

Network reads and channels reads (for user requests) are orchestrated concurrently with select!. The function abstracting the above select returns packets to write to network. But this network write is in a synchronous path which might be affecting the throughput as network reads cannot happen during this time. Find ways to fix this

Builders for Request are to limiting

First of all: I like the new design - thanks for all the work.

The builders used to create Requests feel to limiting for certain use-cases.
Here are some examples:

  • subscribe allows only to pass a single topic. During mass-subscribe, I have a Vec of topics already. It would be nice to have a builder which allow me to pass this directly.
  • Publish uses a Arc<Vec<u8>> as payload, but the builder does not allow to pass an Arc. I've got an Arc already, as I'm publishing the same message in a loop. Would be nice to avoid the clones.

I'm not really sure what's the best way to design such an API. But maybe wi can use this issue to collect some ideas/use-cases.

Persisting in-flight messages to disk

Dear all,

we're currently using the rumqtt crate but have hit kind of a road-block with that, and since that crate is archived we are considering switching to this crate.

The problem we're facing is that we need guaranteed message delivery, even when the software crashes or the system reboots, hence we must persist outgoing messages to disk and remove them only when we receive an acknowledgement from the broker.
There was some discussion in the (locked) issue tracker of rumqtt where the main author @tekjar stated that they prefer to implement such a functionality outside of the crate. However, we were not able to do this, since there seems to be no way of knowing when a message is ack'ed by the broker.

So the question is: would this be possible to implement using rumq. Or, even better, does anybody have a tip on how to make this work using the "old" rumqtt crate? Sadly, we could not ask over there since the issue tracker is locked...

Best wishes, and thank you very much!
Michael

Build broken because of tokio::select

I'm super confused. When pulling in rums-client as a dependency along with tokio in a otherwise completely empty project:

[dependencies]
tokio = "0.2"
rumq-client = { git = "https://github.com/tekjar/rumq.git" }

I get a strange compilation error regarding tokio::select.

~/asdf ‹master*› cargo build
    Updating git repository `https://github.com/tekjar/rumq.git`
    Updating crates.io index
   Compiling rumq-client v0.1.0-alpha.6 (https://github.com/tekjar/rumq.git#b0adb18b)
error[E0432]: unresolved import `tokio::select`
 --> /Users/felix/.cargo/git/checkouts/rumq-df81ca706f7bfd29/b0adb18/rumq-client/src/eventloop.rs:6:5
  |
6 | use tokio::select;
  |     ^^^^^^^^^^^^^ no `select` in the root

error: cannot determine resolution for the macro `select`
   --> /Users/felix/.cargo/git/checkouts/rumq-df81ca706f7bfd29/b0adb18/rumq-client/src/eventloop.rs:120:21
    |
120 |                     select! {
    |                     ^^^^^^
    |
    = note: import resolution is stuck, try simplifying macro imports

error: aborting due to 2 previous errors

For more information about this error, try `rustc --explain E0432`.
error: could not compile `rumq-client`.

@mojzu had something similar in #34 . Pinning the tokio dependency to the latest 0.2.13 ends up with the same error. The toolchain ist latest stable and rumq builds fine when built standalone.

I tracked down the change that causes the error to #42 which changes nothing regarding deps, versions etc...

Any ideas?

Notification enum should not contain Connected

The Notification enum should not contain the variant Connected, since this can never happen once the stream created. It should not be mixed with the other MQTT stream logic, since It will only happen once, and only happen as the first thing when creating eventloop.

When creating the eventloop or stream this should be reported back in the form of a Result<Connected, ConnectionError>

Example:

let mut eventstream = eventloop(mqtt_options, rx).stream();

// this will always be the first event in a successfull connection
if let Some(Notification::Connected) = eventstream.next() {
  while let Some(notif) = evenstream.next() {
    match notif {
      Notification::Connected { 
      // this can never happen again, and should thus not be part of this enum
      // and having it here kind of bloats the user logic
      },
      _ => { // all interesting events },
    }
  }
} 

Sending a Request::Disconnect to event loop handle causes panic

If I do something like this:

let event_loop = EventLoop::new(mqtt_options, REQUESTS_CAP).await;
let requests_tx = event_loop.handle();
requests_tx.send(Request::Disconnect).await;

It panics at state.rs:138 because it is unimplemented.

I'll send a patch which seems to fix this.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.