Giter Site home page Giter Site logo

Comments (11)

vitalerter avatar vitalerter commented on June 28, 2024 2

I see Warnings less now
I think option to set queue length is need.

from aiomqtt.

bachya avatar bachya commented on June 28, 2024 1

@frederikaalund Yes, makes sense – I had thought of a semaphore, too, but appreciate your guidance on where to utilize it. I'll submit a PR soon!

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

Hi vitalerter. Thanks for submitting this issue. Let me have a look. πŸ‘

This "pending publish calls" warning appears when there are more than ten messages in the "publish queue". It is basically a sign of congestion. In other words: You publish messages faster than the system can process them.

If the number of "pending publish calls":

  • increases over time, you must take action. The process will run out of memory at some point in time.
  • decreases over time, you don't have to worry.

There is no back pressure in asyncio-mqtt as it is now. It would be pretty straight-forward to add a global back pressure system for all the possible "pending calls". Simply fix the max_size parameter of the pending calls queue. I just never got around to do that. Do you want to have a go at it?

~Frederik

from aiomqtt.

vitalerter avatar vitalerter commented on June 28, 2024

it is occurs when publishing with qos>0

async def mqtt_publish(self, topic, msg, broker):
        try:
            res = await self.clients.get(broker).publish(topic, msg, qos=1)
            _logging.debug("Mqtt publishing", extra={
                "data": {"result": res, "message": msg, "topic": topic}})
        except MqttError as identifier:
            _logging.error("Mqtt publishing Failed", extra={
                "data": {"exception": identifier, "message": msg, "topic": topic}})

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

it is occurs when publishing with qos>0

That does make sense, since qos > 0 requires more communication (and hence more time to complete). :)

Again, this warning is a sign of congestion. How many messages are you publishing and how fast are you publishing them?

from aiomqtt.

vitalerter avatar vitalerter commented on June 28, 2024

Hi.
I increased the self._pending_calls_threshold = 50 this is not showing me the message.

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

I increased the self._pending_calls_threshold = 50 this is not showing me the message.

Question is then whether:

  • You'll still see the warning but just at a much later time (since the threshold is much larger now)
  • The threshold is now large enough to account for time-limited congestion.

In any case, I'm glad that it worked out for you. I don't want to increase the default threshold just yet as I still believe the current threshold fits the common use case. If we get additional reports about these warnings, then I'll do one or more of the following:

  • Reconsider the default
  • Provide an API to change the default threshold
  • Implement a proper back pressure mechanism (make the queuing visible in the API design)

How does that sound?

from aiomqtt.

bachya avatar bachya commented on June 28, 2024

@frederikaalund I'm noticing something similar in my library (via this user report: bachya/ecowitt2mqtt#103). Are there still plans to implement back pressure in this library?

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

Thanks for the input, @bachya.

Are there still plans to implement back pressure in this library?

It's still on the table. I don't, however, have any plans to implement this myself. Pull requests are always welcome. :)

from aiomqtt.

bachya avatar bachya commented on June 28, 2024

@frederikaalund Happy to submit a PR if I can get some idea of where to tackle this in the codebase. You mentioned above:

It would be pretty straight-forward to add a global back pressure system for all the possible "pending calls". Simply fix the max_size parameter of the pending calls queue.

I assume this means allow for some public-API control of:

https://github.com/sbtinstruments/asyncio-mqtt/blob/2a352ba87c35d4c4ee22fa4420823828e1fff025/asyncio_mqtt/client.py#L312-L314

...when someone instantiates a Client?

from aiomqtt.

frederikaalund avatar frederikaalund commented on June 28, 2024

Happy to submit a PR if I can get some idea of where to tackle this in the codebase

Sounds good. πŸ‘ I'll try to help you. :)

https://github.com/sbtinstruments/asyncio-mqtt/blob/2a352ba87c35d4c4ee22fa4420823828e1fff025/asyncio_mqtt/client.py#L312-L314

This queue is for the incoming messages. That is, the messages that the server sends to the client. Unfortunately, it's impossible to implement back pressure in that direction. The server can spam the client with messages as much as it want.

The message in question here is:

There are 13 pending publish calls.

This is the queue for the outgoing messages. That is, the messages that the client sends to the server. This is where we can apply back pressure. We can block the caller (the user code) before they spam us with messages to send to the server.

Background info

Internally, we wrap outgoing calls (publish, subscribe, unsubscribe) like this (code is for publish):

https://github.com/sbtinstruments/asyncio-mqtt/blob/2a352ba87c35d4c4ee22fa4420823828e1fff025/asyncio_mqtt/client.py#L251-L255

In turn, the _pending_call context manager contains:

https://github.com/sbtinstruments/asyncio-mqtt/blob/2a352ba87c35d4c4ee22fa4420823828e1fff025/asyncio_mqtt/client.py#L373-L379

This is where the warning comes from.

Proposal 1

Note that the warning message is just that: A warning. All calls go through regardless. It's just a sign of congestion. Another "fix" is to simply increase the warning threshold (default is 10) or disable it altogether.

I put the warning there to inform the user about congestion.

Proposal 2

Apply back pressure to the user code to avoid congestion in the first place. That is, we want to limit the number of concurrent outgoing calls. In other words, we treat the outgoing calls like a shared resource that we want to limit access to. Does it sound familiar yet? That's what a semaphore is for.

I propose that we change:

async def publish(self, ...) -> None:
    ...

into:

async def publish(self, ...) -> None:
    async with self._outgoing_calls_sem:
        ...

Simple, right? :) Now, each outgoing call (e.g., publish) blocks until we can acquire the semaphore (self._outgoing_calls_sem).
We construct the semaphore in Client.__init__:

self._outgoing_calls_sem = asyncio.Semaphore(max_concurrent_outgoing_calls)

where max_concurrent_outgoing_calls is a new keyword argument for Client.__init__.

Of course, we need to change the other outgoing calls (subscribe and unsubscribe) accordingly. We could even wrap all of this in a decorator like so:

@outgoing_call
async def publish(self, ...) -> None:
    ...
@outgoing_call
async def subscribe(self, ...) -> int:
    ...
@outgoing_call
async def unsubscribe(self, ...) -> None:
    ...

I leave the implementation of that as an exercise for the user. :)

Likewise, we need to support the case that max_concurrent_outgoing_calls=None to be backward-compatible.

@bachya Are you up for this? Let me know if you have any questions.

from aiomqtt.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.