Giter Site home page Giter Site logo

Comments (9)

noperator-zz avatar noperator-zz commented on June 28, 2024 1

The easiest workaround is just to put the entire handler is a separate coroutine and start it with ensure_future

async def on_message(message):
    result = await long_running_handler(topic, payload) 
    do_something(result)


async with Client("test.mosquitto.org") as client:
    async with client.filtered_messages("floors/+/humidity") as messages:
        await client.subscribe("floors/#")
        async for message in messages:
			asyncio.ensure_future(on_message(message))

Admittedly, I'm new to asyncio and this took far too long to figure out. Seems obvious in retrospect now. I'm leaving this open in case anyone has other suggestions, but this fix works for me.

from aiomqtt.

noperator-zz avatar noperator-zz commented on June 28, 2024 1

Great, that makes sense. So my confusion stemmed from the fact that the Javascript async-mqtt library uses an async network callback function. This allows me to use await without 'blocking' the callback from being called as soon as the next message arrives.

Obviously standard Python does not have an async event loop built int like JS does, so the network callbacks cannot be async. Instead, we are forced to queue the messages into an async context and then explicitly schedule the message handler to run in the background, which is made very easy thanks to asyncio-mqtt!

Thanks for your fast support and great work on this library!

from aiomqtt.

empicano avatar empicano commented on June 28, 2024 1

Hi there, I agree, would you be willing to make a PR to make this clearer in the docs? 😊

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024 1

Hi there, I agree, would you be willing to make a PR to make this clearer in the docs?

I got ninja'd πŸ˜…

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

Hi noperator-zz, thanks for raising this issue. Let me have a look.

That should work. Unless the long_running_handler blocks the event loop. What's happening in that function? Does it yield control back to the event loop or block it for the duration of the call?

from aiomqtt.

noperator-zz avatar noperator-zz commented on June 28, 2024

Thanks for taking a look at this. Do you mean my original example (without ensure_future) should handle messages concurrently?

Here's a full example for completeness:

import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from asyncio_mqtt import Client, MqttError


async def advanced_example():
    async with AsyncExitStack() as stack:
        tasks = set()

        # Connect to the MQTT broker
        client = Client("test.mosquitto.org")
        await stack.enter_async_context(client)

        messages = await stack.enter_async_context(client.unfiltered_messages())
        task = asyncio.create_task(message_handler(messages))
        tasks.add(task)

        await client.subscribe("test", qos=2)

        task = asyncio.create_task(post_to_topics(client))
        tasks.add(task)

        await asyncio.gather(*tasks)


async def post_to_topics(client):
    x = 0
    while True:
        await client.publish("test", ("test %s" % x).encode("utf-8"), qos=2)
        await asyncio.sleep(1)
        x += 1


async def message_handler(messages):
    async for message in messages:
        # Using 'await' here, only one message will be processed at a time
        await on_message(message)

        # Explicitly starting the handler in the background is required to handle messages concurrently
        # asyncio.ensure_future(on_message(message))


async def on_message(message):
    print(f"Start handler: {message.payload}")
    result = await long_running_handler(message)
    print(f"Finished handler: {message.payload}: {result}")


async def long_running_handler(message):
    await asyncio.sleep(5)
    return message.payload


asyncio.run(advanced_example())

In order for messages to be handled concurrently in this example, message_handler must be modified to use the ensure_future line instead of the await line. This is the part which originally threw me off. I thought the asyncio.sleep in long_running_handler would allow the function to yield and move onto the next message, but perhaps my understanding of asyncio is flawed.

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

Thanks for the follow-up. Ah, now that I see the full example code I get it.

I was wrong in my previous reply. The 5 second wait between messages is indeed the intended behaviour.

async def message_handler(messages):
    # You only get a new message when the code hits this for loop. asyncio-mqtt queues up all the messages in
    # the background. If you're too slow to process the messages, you may run out of memory at some point.
    # This is why we have the `queue_maxsize` parameter for the `filtered_messages` function. Per default, the
    # queue is not bounded. If you bound the queue (e.g., `queue_maxsize=10`), asyncio-mqtt will drop messages
    # if the queue is full.
    async for message in messages:
        # Note that `on_message` only returns after ~5 seconds.
        # Therefore, we won't hit the for loop before it's done. In other words, we won't get a new message
        # before `on_message` returns.
        await on_message(message)

You already found the right tool to do what you want: asyncio.ensure_future(on_message(message)). This function schedules on_message(message) and returns immediately. Therefore, we hit the for loop right away (thus we process the next message right away without the 5 second delay).

Does it make sense? :)

from aiomqtt.

thalesmaoa avatar thalesmaoa commented on June 28, 2024

This is really important. Should be added to the doc.

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

This is really important. Should be added to the doc.

I agree. :) PRs are welcome!

Also, the modern (python 3.11) approach is to use asyncio.TaskGroup instead of asyncio.ensure_future. E.g.:

async def message_handler(messages):
    async with asyncio.TaskGroup() as tg:
        async for message in messages:
            tg.create_task(on_message(message))

from aiomqtt.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.