Comments (9)
The easiest workaround is just to put the entire handler is a separate coroutine and start it with ensure_future
async def on_message(message):
result = await long_running_handler(topic, payload)
do_something(result)
async with Client("test.mosquitto.org") as client:
async with client.filtered_messages("floors/+/humidity") as messages:
await client.subscribe("floors/#")
async for message in messages:
asyncio.ensure_future(on_message(message))
Admittedly, I'm new to asyncio and this took far too long to figure out. Seems obvious in retrospect now. I'm leaving this open in case anyone has other suggestions, but this fix works for me.
from aiomqtt.
Great, that makes sense. So my confusion stemmed from the fact that the Javascript async-mqtt library uses an async network callback function. This allows me to use await without 'blocking' the callback from being called as soon as the next message arrives.
Obviously standard Python does not have an async event loop built int like JS does, so the network callbacks cannot be async. Instead, we are forced to queue the messages into an async context and then explicitly schedule the message handler to run in the background, which is made very easy thanks to asyncio-mqtt!
Thanks for your fast support and great work on this library!
from aiomqtt.
Hi there, I agree, would you be willing to make a PR to make this clearer in the docs? π
from aiomqtt.
Hi there, I agree, would you be willing to make a PR to make this clearer in the docs?
I got ninja'd π
from aiomqtt.
Hi noperator-zz, thanks for raising this issue. Let me have a look.
That should work. Unless the long_running_handler
blocks the event loop. What's happening in that function? Does it yield control back to the event loop or block it for the duration of the call?
from aiomqtt.
Thanks for taking a look at this. Do you mean my original example (without ensure_future) should handle messages concurrently?
Here's a full example for completeness:
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from asyncio_mqtt import Client, MqttError
async def advanced_example():
async with AsyncExitStack() as stack:
tasks = set()
# Connect to the MQTT broker
client = Client("test.mosquitto.org")
await stack.enter_async_context(client)
messages = await stack.enter_async_context(client.unfiltered_messages())
task = asyncio.create_task(message_handler(messages))
tasks.add(task)
await client.subscribe("test", qos=2)
task = asyncio.create_task(post_to_topics(client))
tasks.add(task)
await asyncio.gather(*tasks)
async def post_to_topics(client):
x = 0
while True:
await client.publish("test", ("test %s" % x).encode("utf-8"), qos=2)
await asyncio.sleep(1)
x += 1
async def message_handler(messages):
async for message in messages:
# Using 'await' here, only one message will be processed at a time
await on_message(message)
# Explicitly starting the handler in the background is required to handle messages concurrently
# asyncio.ensure_future(on_message(message))
async def on_message(message):
print(f"Start handler: {message.payload}")
result = await long_running_handler(message)
print(f"Finished handler: {message.payload}: {result}")
async def long_running_handler(message):
await asyncio.sleep(5)
return message.payload
asyncio.run(advanced_example())
In order for messages to be handled concurrently in this example, message_handler
must be modified to use the ensure_future
line instead of the await
line. This is the part which originally threw me off. I thought the asyncio.sleep in long_running_handler
would allow the function to yield and move onto the next message, but perhaps my understanding of asyncio is flawed.
from aiomqtt.
Thanks for the follow-up. Ah, now that I see the full example code I get it.
I was wrong in my previous reply. The 5 second wait between messages is indeed the intended behaviour.
async def message_handler(messages):
# You only get a new message when the code hits this for loop. asyncio-mqtt queues up all the messages in
# the background. If you're too slow to process the messages, you may run out of memory at some point.
# This is why we have the `queue_maxsize` parameter for the `filtered_messages` function. Per default, the
# queue is not bounded. If you bound the queue (e.g., `queue_maxsize=10`), asyncio-mqtt will drop messages
# if the queue is full.
async for message in messages:
# Note that `on_message` only returns after ~5 seconds.
# Therefore, we won't hit the for loop before it's done. In other words, we won't get a new message
# before `on_message` returns.
await on_message(message)
You already found the right tool to do what you want: asyncio.ensure_future(on_message(message))
. This function schedules on_message(message)
and returns immediately. Therefore, we hit the for loop right away (thus we process the next message right away without the 5 second delay).
Does it make sense? :)
from aiomqtt.
This is really important. Should be added to the doc.
from aiomqtt.
This is really important. Should be added to the doc.
I agree. :) PRs are welcome!
Also, the modern (python 3.11) approach is to use asyncio.TaskGroup
instead of asyncio.ensure_future
. E.g.:
async def message_handler(messages):
async with asyncio.TaskGroup() as tg:
async for message in messages:
tg.create_task(on_message(message))
from aiomqtt.
Related Issues (20)
- Impossible to reuse client in case of connection timeout HOT 7
- Typing warnings in IDE on Client methods HOT 2
- AttributeError: module 'aiomqtt' has no attribute '__version__'
- session persistence with protocol version 5 HOT 1
- Pin Paho to <2.0 HOT 2
- Make aiomqtt compatible with paho v2 HOT 10
- Topic.matches should be used when doing == HOT 4
- How to get the IP of a failed connection attempt
- strange reconnection HOT 2
- MQTT broker reports a disconnection/reconnection... but no aiomqtt.MqttError is raised HOT 4
- Exceptions on __aenter__ not handled. HOT 2
- Reconnect Bug HOT 1
- Cannot instantiate a client due to internal mqtt problem HOT 2
- Issues with uvicorn on Windows 10 HOT 5
- [BUG] Cannot install version 2.0.0 with paho-mqtt 2.0.0 HOT 2
- [BUG] 127.0.0.1/localhost Work Incorrectly HOT 3
- Do not send messages for a long time mqtt automatically disconnects HOT 1
- Can't read incoming messages in pytests HOT 8
- No convenient way to get message without getting locked into a for loop HOT 2
- Can aiomqtt queue has a ring buffer optionοΌ for high frequense in-comming messages? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiomqtt.