Comments (12)
yield
It is the key feature to do multiple works concurrently.
See https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview/composition/coroutine.html
But boost example is not enough to understand well.
passing object instead of a callback
You can pass the object that has operator()
that maches callback sigunature.
So the struct impl has all possible operator() overloads:
// forwarding callbacks
void operator()() {
proc({}, {}, {}, {}, reason::others);
}
void operator()(boost::system::error_code const& ec, reason r = reason::others) {
proc(ec, {}, {}, {}, r);
}
void operator()(boost::system::error_code ec, as::ip::tcp::resolver::results_type eps) {
proc(ec, {}, {}, std::move(eps), reason::others);
}
void operator()(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
proc(ec, {}, {}, {}, reason::others);
}
void operator()(am::system_error const& se) {
proc({}, se, {}, {}, reason::others);
}
void operator()(am::packet_variant pv) {
proc({}, {}, am::force_move(pv), {}, reason::by_recv);
}
I added enum class reason
as the last parameter. I will explain later.
initiate the event
// Recv and pub loop
yield {
app_.amep.recv(*this);
app_.tim.expires_after(std::chrono::seconds(1));
app_.tim.async_wait(as::append(*this, reason::by_timer));
}
You need to kick both recv() and timer. In order to do that, you can use yield{}
notation.
You can initiate multiple async function in the yield{}
.
However, it is unpredictable which event would happen first reciving packet or timer fired.
stready_timer::async_wait()
has the following completion token. It is called as WaitToken.
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/basic_waitable_timer/async_wait.html
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/reference/WaitToken.html
It is simple that the parameter is only const boost::system::error_code& ec
.
recv()
has the completion token am::packet_variant pv
.
But const boost::system::error_code& ec
is used for other purpose.
So I intruduced reason
enum.
enum class reason {
by_timer,
by_recv,
others
};
void operator()(boost::system::error_code const& ec, reason r = reason::others) {
proc(ec, {}, {}, {}, r);
}
void operator()(am::packet_variant pv) {
proc({}, {}, am::force_move(pv), {}, reason::by_recv);
}
void proc(
boost::system::error_code const& ec,
am::system_error const& se,
am::packet_variant pv,
std::optional<as::ip::tcp::resolver::results_type> eps,
reason r
) {
Now, if proc() is called by recv() completed, then reason r is set to by_recv
, otherwise it is set to others
.
In order to distinguish timer is fired, we need to pass by_timer
to r
for the following operator().
void operator()(boost::system::error_code const& ec, reason r = reason::others) {
proc(ec, {}, {}, {}, r);
}
How to do that?
Use append
.
See https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview/composition/token_adapters.html#boost_asio.overview.composition.token_adapters.append
app_.tim.async_wait(as::append(*this, reason::by_timer));
When you call async functions with append() function, the parameter reason::by_timer
is appended as the last argument. Then
void operator()(boost::system::error_code const& ec, reason r = reason::others) {
proc(ec, {}, {}, {}, r);
}
is called with r
as reason::by_timer
.
from async_mqtt.
This code could help you https://github.com/redboltz/async_mqtt/blob/main/tool/client_cli.cpp
If you still have questions, please write simpler spec.
e.g.
-
Client c1
- subscribe topic1 QoS0
- publish topic2 QoS0 for each second
-
Client c2
- ...
from async_mqtt.
Additional information:
bench marking tool has timer publish
Lines 603 to 702 in 3b2b047
It is a little complecated...
from async_mqtt.
Thanks for the information. I was able to run the examples. I got some idea from example/ep_cb_mqtt_client.cpp but still unsure how to use it. Is there any example for just simple publish and subscribe like other libraries so that I can test the library?
from async_mqtt.
Here are example codes:
This is a simple connect -> subscribe -> publish example using future:
https://github.com/redboltz/async_mqtt/blob/main/example/ep_future_mqtt_client.cpp
This is a simple connect -> subscribe -> publish example using boost asio's stackless coroutine:
https://github.com/redboltz/async_mqtt/blob/main/example/ep_slcoro_mqtt_client.cpp
You need to learn Boost.Asio basics before using async_mqtt.
For example:
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview/model.html
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview/model/async_ops.html
https://www.boost.org/doc/libs/1_82_0/doc/html/boost_asio/overview/composition.html
from async_mqtt.
Additional example:
This is connect -> subscribe -> infinity recv loop example code.
It is not a part of exampe
directory.
NOTE: If multiple clients that have the same username and client_id, the previous connection is disconnected by the broker (if you use async_mqtt broker). It is typical behavior of the broker. If you avoid this trouble, use unique client_id for each connection.
// Copyright Takatoshi Kondo 2023
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <async_mqtt/all.hpp>
namespace as = boost::asio;
namespace am = async_mqtt;
#include <boost/asio/yield.hpp>
struct app {
app(as::ip::tcp::resolver& res,
std::string host,
std::string port,
std::string topic_filter,
am::qos qos,
am::endpoint<am::role::client, am::protocol::mqtt>& amep
):res{res},
host{std::move(host)},
port{std::move(port)},
topic_filter{std::move(topic_filter)},
qos{qos},
amep{amep}
{
impl_();
}
friend struct impl;
struct impl : as::coroutine {
impl(app& a):app_{a} {
}
// forwarding callbacks
void operator()() {
proc({}, {}, {}, {});
}
void operator()(boost::system::error_code const& ec) {
proc(ec, {}, {}, {});
}
void operator()(boost::system::error_code ec, as::ip::tcp::resolver::results_type eps) {
proc(ec, {}, {}, std::move(eps));
}
void operator()(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
proc(ec, {}, {}, {});
}
void operator()(am::system_error const& se) {
proc({}, se, {}, {});
}
void operator()(am::packet_variant pv) {
proc({}, {}, am::force_move(pv), {});
}
private:
void proc(
boost::system::error_code const& ec,
am::system_error const& se,
am::packet_variant pv,
std::optional<as::ip::tcp::resolver::results_type> eps
) {
reenter (*this) {
std::cout << "start" << std::endl;
// Resolve hostname
yield app_.res.async_resolve(app_.host, app_.port, *this);
std::cout << "async_resolve:" << ec.message() << std::endl;
if (ec) return;
// Layer
// am::stream -> TCP
// Underlying TCP connect
yield as::async_connect(
app_.amep.next_layer(),
*eps,
*this
);
std::cout
<< "TCP connected ec:"
<< ec.message()
<< std::endl;
if (ec) return;
// Send MQTT CONNECT
yield app_.amep.send(
am::v3_1_1::connect_packet{
true, // clean_session
0, // keep_alive
am::allocate_buffer("cid1"),
am::nullopt, // will
am::nullopt, // username set like am::allocate_buffer("user1"),
am::nullopt // password set like am::allocate_buffer("pass1")
},
*this
);
if (se) {
std::cout << "MQTT CONNECT send error:" << se.what() << std::endl;
return;
}
// Recv MQTT CONNACK
yield app_.amep.recv(*this);
if (pv) {
pv.visit(
am::overload {
[&](am::v3_1_1::connack_packet const& p) {
std::cout
<< "MQTT CONNACK recv"
<< " sp:" << p.session_present()
<< std::endl;
},
[](auto const&) {}
}
);
}
else {
std::cout
<< "MQTT CONNACK recv error:"
<< pv.get<am::system_error>().what()
<< std::endl;
return;
}
// Send MQTT SUBSCRIBE
yield app_.amep.send(
am::v3_1_1::subscribe_packet{
*app_.amep.acquire_unique_packet_id(),
{ {am::allocate_buffer(app_.topic_filter), app_.qos} }
},
*this
);
if (se) {
std::cout << "MQTT SUBSCRIBE send error:" << se.what() << std::endl;
return;
}
// Recv MQTT SUBACK
yield app_.amep.recv(*this);
if (pv) {
pv.visit(
am::overload {
[&](am::v3_1_1::suback_packet const& p) {
std::cout
<< "MQTT SUBACK recv"
<< " pid:" << p.packet_id()
<< " entries:";
for (auto const& e : p.entries()) {
std::cout << e << " ";
}
std::cout << std::endl;
},
[](auto const&) {}
}
);
}
else {
std::cout
<< "MQTT SUBACK recv error:"
<< pv.get<am::system_error>().what()
<< std::endl;
return;
}
// Recv loop
while (true) {
yield app_.amep.recv(*this);
if (pv) {
pv.visit(
am::overload {
[&](am::v3_1_1::publish_packet const& p) {
std::cout
<< "MQTT PUBLISH recv"
<< " pid:" << p.packet_id()
<< " topic:" << p.topic()
<< " payload:" << am::to_string(p.payload())
<< " qos:" << p.opts().get_qos()
<< " retain:" << p.opts().get_retain()
<< " dup:" << p.opts().get_dup()
<< std::endl;
},
[&](am::v3_1_1::puback_packet const& p) {
std::cout
<< "MQTT PUBACK recv"
<< " pid:" << p.packet_id()
<< std::endl;
},
[](auto const&) {}
}
);
}
else {
std::cout
<< "MQTT recv error:"
<< pv.get<am::system_error>().what()
<< std::endl;
return;
}
}
}
}
app& app_;
};
as::ip::tcp::resolver& res;
std::string host;
std::string port;
std::string topic_filter;
am::qos qos;
am::endpoint<am::role::client, am::protocol::mqtt>& amep;
impl impl_{*this};
};
#include <boost/asio/unyield.hpp>
int main(int argc, char* argv[]) {
am::setup_log(am::severity_level::trace);
if (argc != 5) {
std::cout << "Usage: " << argv[0] << " host port topic_filter qos" << std::endl;
return -1;
}
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
am::protocol_version::v3_1_1,
ioc.get_executor()
};
auto qos = boost::lexical_cast<int>(argv[4]);
if (qos < 0 || qos > 2) {
std::cout << "invalid qos:" << qos << " it should be 0,1,or 2" << std::endl;
return -1;
}
app a{res, argv[1], argv[2], argv[3], static_cast<am::qos>(qos), amep};
ioc.run();
}
from async_mqtt.
Thanks for the codes. I am able to run the codes. I previously worked with paho and mosquitto codes and there the event loop handles all the network handling and I can run the publisher and subscriber concurrently. As I am not used to this type of implementation I am facing difficulties. For ep_cb_mqtt_client.cpp
code after publishing client waits for the subscription but my requirement is publisher and subscriber will run independently.
Client c1
subscribe topic2 QoS0 (receive the data from client2)
publish topic1 QoS0 for each second
Client c2
subscribe topic1 QoS0 (receive the data from client1)
publish topic2 QoS0 (send the data of topic1 to topic2)
I have looked into the examples. I have not found any example (may be I have missed) where publisher and subscriber works simultaneously. Do you have any example that has this kind of implementation or any example with threading mechanism?
For building the codes in the tool
folder I have run the cmake file and after that I have run the make file but got this error:
/usr/bin/ld: cannot find -lasync_mqtt_iface: No such file or directory
collect2: error: ld returned 1 exit status
make[2]: *** [CMakeFiles/bench.dir/build.make:98: bench] Error 1
make[1]: *** [CMakeFiles/Makefile2:104: CMakeFiles/bench.dir/all] Error 2
make: *** [Makefile:136: all] Error 2
There is no error when I build the codes in the example
folder.
from async_mqtt.
Now, I understand what you want to do.
If I understand correctly, you want to receive packets and send publish concurrently.
So, you think that you need to something multithreading.
The answer is you don't need multithreading for that.
Here is whole code that doing what you want to do.
// Copyright Takatoshi Kondo 2023
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/lexical_cast.hpp>
#include <async_mqtt/all.hpp>
namespace as = boost::asio;
namespace am = async_mqtt;
#include <boost/asio/yield.hpp>
struct app {
enum class reason {
by_timer,
by_recv,
others
};
app(as::ip::tcp::resolver& res,
std::string host,
std::string port,
std::string cid,
std::string pub_topic,
std::string sub_topic,
std::string payload,
am::endpoint<am::role::client, am::protocol::mqtt>& amep
):res{res},
host{std::move(host)},
port{std::move(port)},
cid{std::move(cid)},
pub_topic{std::move(pub_topic)},
sub_topic{std::move(sub_topic)},
payload{std::move(payload)},
amep{amep}
{
impl_();
}
friend struct impl;
struct impl : as::coroutine {
impl(app& a):app_{a} {
}
// forwarding callbacks
void operator()() {
proc({}, {}, {}, {}, reason::others);
}
void operator()(boost::system::error_code const& ec, reason r = reason::others) {
proc(ec, {}, {}, {}, r);
}
void operator()(boost::system::error_code ec, as::ip::tcp::resolver::results_type eps) {
proc(ec, {}, {}, std::move(eps), reason::others);
}
void operator()(boost::system::error_code ec, as::ip::tcp::endpoint /*unused*/) {
proc(ec, {}, {}, {}, reason::others);
}
void operator()(am::system_error const& se) {
proc({}, se, {}, {}, reason::others);
}
void operator()(am::packet_variant pv) {
proc({}, {}, am::force_move(pv), {}, reason::by_recv);
}
private:
void proc(
boost::system::error_code const& ec,
am::system_error const& se,
am::packet_variant pv,
std::optional<as::ip::tcp::resolver::results_type> eps,
reason r
) {
reenter (*this) {
std::cout << "start" << std::endl;
// Resolve hostname
yield app_.res.async_resolve(app_.host, app_.port, *this);
std::cout << "async_resolve:" << ec.message() << std::endl;
if (ec) return;
// Layer
// am::stream -> TCP
// Underlying TCP connect
yield as::async_connect(
app_.amep.next_layer(),
*eps,
*this
);
std::cout
<< "TCP connected ec:"
<< ec.message()
<< std::endl;
if (ec) return;
// Send MQTT CONNECT
yield app_.amep.send(
am::v3_1_1::connect_packet{
true, // clean_session
0, // keep_alive
am::allocate_buffer(app_.cid),
am::nullopt, // will
am::nullopt, // username set like am::allocate_buffer("user1"),
am::nullopt // password set like am::allocate_buffer("pass1")
},
*this
);
if (se) {
std::cout << "MQTT CONNECT send error:" << se.what() << std::endl;
return;
}
// Recv MQTT CONNACK
yield app_.amep.recv(*this);
if (pv) {
pv.visit(
am::overload {
[&](am::v3_1_1::connack_packet const& p) {
std::cout
<< "MQTT CONNACK recv"
<< " sp:" << p.session_present()
<< std::endl;
},
[](auto const&) {}
}
);
}
else {
std::cout
<< "MQTT CONNACK recv error:"
<< pv.get<am::system_error>().what()
<< std::endl;
return;
}
// Send MQTT SUBSCRIBE
yield app_.amep.send(
am::v3_1_1::subscribe_packet{
*app_.amep.acquire_unique_packet_id(),
{ {am::allocate_buffer(app_.sub_topic), am::qos::at_most_once} }
},
*this
);
if (se) {
std::cout << "MQTT SUBSCRIBE send error:" << se.what() << std::endl;
return;
}
// Recv MQTT SUBACK
yield app_.amep.recv(*this);
if (pv) {
pv.visit(
am::overload {
[&](am::v3_1_1::suback_packet const& p) {
std::cout
<< "MQTT SUBACK recv"
<< " pid:" << p.packet_id()
<< " entries:";
for (auto const& e : p.entries()) {
std::cout << e << " ";
}
std::cout << std::endl;
},
[](auto const&) {}
}
);
}
else {
std::cout
<< "MQTT SUBACK recv error:"
<< pv.get<am::system_error>().what()
<< std::endl;
return;
}
// Recv and pub loop
yield {
app_.amep.recv(*this);
app_.tim.expires_after(std::chrono::seconds(1));
app_.tim.async_wait(as::append(*this, reason::by_timer));
}
while (true) yield {
switch (r) {
case reason::by_timer:
// Send MQTT PUBLISH
app_.amep.send(
am::v3_1_1::publish_packet{
am::allocate_buffer(app_.pub_topic),
am::allocate_buffer(app_.payload),
am::qos::at_most_once
},
*this
);
app_.tim.expires_after(std::chrono::seconds(1));
app_.tim.async_wait(as::append(*this, reason::by_timer));
break;
case reason::by_recv:
if (pv) {
pv.visit(
am::overload {
[&](am::v3_1_1::publish_packet const& p) {
std::cout
<< "MQTT PUBLISH recv"
<< " pid:" << p.packet_id()
<< " topic:" << p.topic()
<< " payload:" << am::to_string(p.payload())
<< " qos:" << p.opts().get_qos()
<< " retain:" << p.opts().get_retain()
<< " dup:" << p.opts().get_dup()
<< std::endl;
},
[](auto const&) {}
}
);
}
else {
std::cout
<< "MQTT recv error:"
<< pv.get<am::system_error>().what()
<< std::endl;
return;
}
app_.amep.recv(*this);
break;
case reason::others:
break;
}
}
}
}
app& app_;
};
as::ip::tcp::resolver& res;
std::string host;
std::string port;
std::string cid;
std::string pub_topic;
std::string sub_topic;
std::string payload;
am::endpoint<am::role::client, am::protocol::mqtt>& amep;
as::steady_timer tim{amep.strand()};
impl impl_{*this};
};
#include <boost/asio/unyield.hpp>
int main(int argc, char* argv[]) {
am::setup_log(am::severity_level::trace);
if (argc != 7) {
std::cout << "Usage: " << argv[0] << " host port cid pub_topic sub_topic payload" << std::endl;
return -1;
}
as::io_context ioc;
as::ip::tcp::socket resolve_sock{ioc};
as::ip::tcp::resolver res{resolve_sock.get_executor()};
am::endpoint<am::role::client, am::protocol::mqtt> amep {
am::protocol_version::v3_1_1,
ioc.get_executor()
};
app a{res, argv[1], argv[2], argv[3], argv[4], argv[5], argv[6], amep};
ioc.run();
}
Usage:
./a.out localhost 1883 cid1 topic1 topic2 from_cid1
At another terminal:
./a.out localhost 1883 cid2 topic2 topic1 from_cid1
I will explain the key point of the code at the next comment but first, please try it.
from async_mqtt.
If you want to use callback based approach instead of stackless coroutine (yield), you can do that.
I don't want to write whole callback example because I recommend yield approach but I can show you pseudo conceptual code.
std::function<void()> publish_proc =
[&] {
tim.expires_after(std::chrono::seconds(1));
tim.async_wait(
[&] (boost::system::error_code const& ec) {
amep.send(publish_packet{...});
publish_proc();
}
);
};
std::function<void()> recv_proc =
[&] {
amep.recv(
[&](am::packet_variant pv) {
...
recv_proc();
}
);
};
publish_proc();
recv_proc();
from async_mqtt.
Thank you so much for the explanations. The code is now working properly!
from async_mqtt.
I have some general questions about the library to find compatibility for my project. If you can answer the following questions that would be really helpful:
-
Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?
-
Is there any dynamic memory allocation (new, delete)?
-
Does the library have threading model or single thread execution?
-
How are the exception handled? (Exception handling should not throw and end execution)
from async_mqtt.
I have some general questions about the library to find compatibility for my project. If you can answer the following questions that would be really helpful:
- Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?
I don't know. async_mqtt supports MQTT v5.0 and v3.1.1 protocol.
- Is there any dynamic memory allocation (new, delete)?
Yes, not only async_mqtt but also Boost.Asio, and other parts.
- Does the library have threading model or single thread execution?
async_mqtt library doesn't create any threads.
But it doesn't mean the library is for single thread.
It is up to users.
For example https://github.com/redboltz/async_mqtt/blob/main/tool/broker.cpp broker application uses async_mqtt lirary on multiply threads.
https://github.com/redboltz/async_mqtt/blob/main/tool/client_cli.cpp CLI application uses async_mqtt on single thread (int main()).
- How are the exception handled? (Exception handling should not throw and end execution)
async_mqtt thows exceptions. https://github.com/search?q=repo%3Aredboltz%2Fasync_mqtt+%22throw+%22&type=code
Users need to catch and handle them.
from async_mqtt.
Related Issues (20)
- bench wss mode memory error HOT 1
- Add doxygen comment to v5 publish packet HOT 1
- Doxygen document generation brokern HOT 1
- shared_ptr_array seems to confitional C++14 or C++17
- Free function document is not generated even if doxygen style commen is added HOT 1
- Support default completion token HOT 1
- Update document(ascii doc part) about multi threading HOT 1
- bind_executor should be done user's side if needed not in the library HOT 3
- Added C++ version requirement (C++17 or later) to the document HOT 1
- Refine document generation
- re-design error handling HOT 1
- Server Keep Alive override check HOT 1
- ASYNC_MQTT_SEPARATE_COMPILATION HOT 2
- Add get_endpoint() to client HOT 1
- Issue when destroying am::client HOT 2
- install broker HOT 4
- coroutine async_send error_code return variable HOT 7
- Internal Compiler Error on compilation HOT 8
- Compile error when ASYNC_MQTT_USE_TLS=ON and/or ASYNC_MQTT_USE_WS=ON HOT 6
- Is there a way to get the error code if async_underlying_handshake fails due to server_unavailable? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from async_mqtt.