Comments (11)
I see Warnings less now
I think option to set queue length is need.
from aiomqtt.
@frederikaalund Yes, makes sense β I had thought of a semaphore, too, but appreciate your guidance on where to utilize it. I'll submit a PR soon!
from aiomqtt.
Hi vitalerter. Thanks for submitting this issue. Let me have a look. π
This "pending publish calls" warning appears when there are more than ten messages in the "publish queue". It is basically a sign of congestion. In other words: You publish messages faster than the system can process them.
If the number of "pending publish calls":
- increases over time, you must take action. The process will run out of memory at some point in time.
- decreases over time, you don't have to worry.
There is no back pressure in asyncio-mqtt as it is now. It would be pretty straight-forward to add a global back pressure system for all the possible "pending calls". Simply fix the max_size
parameter of the pending calls queue. I just never got around to do that. Do you want to have a go at it?
~Frederik
from aiomqtt.
it is occurs when publishing with qos>0
async def mqtt_publish(self, topic, msg, broker):
try:
res = await self.clients.get(broker).publish(topic, msg, qos=1)
_logging.debug("Mqtt publishing", extra={
"data": {"result": res, "message": msg, "topic": topic}})
except MqttError as identifier:
_logging.error("Mqtt publishing Failed", extra={
"data": {"exception": identifier, "message": msg, "topic": topic}})
from aiomqtt.
it is occurs when publishing with qos>0
That does make sense, since qos > 0
requires more communication (and hence more time to complete). :)
Again, this warning is a sign of congestion. How many messages are you publishing and how fast are you publishing them?
from aiomqtt.
Hi.
I increased the self._pending_calls_threshold = 50
this is not showing me the message.
from aiomqtt.
I increased the self._pending_calls_threshold = 50 this is not showing me the message.
Question is then whether:
- You'll still see the warning but just at a much later time (since the threshold is much larger now)
- The threshold is now large enough to account for time-limited congestion.
In any case, I'm glad that it worked out for you. I don't want to increase the default threshold just yet as I still believe the current threshold fits the common use case. If we get additional reports about these warnings, then I'll do one or more of the following:
- Reconsider the default
- Provide an API to change the default threshold
- Implement a proper back pressure mechanism (make the queuing visible in the API design)
How does that sound?
from aiomqtt.
@frederikaalund I'm noticing something similar in my library (via this user report: bachya/ecowitt2mqtt#103). Are there still plans to implement back pressure in this library?
from aiomqtt.
Thanks for the input, @bachya.
Are there still plans to implement back pressure in this library?
It's still on the table. I don't, however, have any plans to implement this myself. Pull requests are always welcome. :)
from aiomqtt.
@frederikaalund Happy to submit a PR if I can get some idea of where to tackle this in the codebase. You mentioned above:
It would be pretty straight-forward to add a global back pressure system for all the possible "pending calls". Simply fix the max_size parameter of the pending calls queue.
I assume this means allow for some public-API control of:
...when someone instantiates a Client
?
from aiomqtt.
Happy to submit a PR if I can get some idea of where to tackle this in the codebase
Sounds good. π I'll try to help you. :)
This queue is for the incoming messages. That is, the messages that the server sends to the client. Unfortunately, it's impossible to implement back pressure in that direction. The server can spam the client with messages as much as it want.
The message in question here is:
There are 13 pending publish calls.
This is the queue for the outgoing messages. That is, the messages that the client sends to the server. This is where we can apply back pressure. We can block the caller (the user code) before they spam us with messages to send to the server.
Background info
Internally, we wrap outgoing calls (publish
, subscribe
, unsubscribe
) like this (code is for publish
):
In turn, the _pending_call
context manager contains:
This is where the warning comes from.
Proposal 1
Note that the warning message is just that: A warning. All calls go through regardless. It's just a sign of congestion. Another "fix" is to simply increase the warning threshold (default is 10
) or disable it altogether.
I put the warning there to inform the user about congestion.
Proposal 2
Apply back pressure to the user code to avoid congestion in the first place. That is, we want to limit the number of concurrent outgoing calls. In other words, we treat the outgoing calls like a shared resource that we want to limit access to. Does it sound familiar yet? That's what a semaphore is for.
I propose that we change:
async def publish(self, ...) -> None:
...
into:
async def publish(self, ...) -> None:
async with self._outgoing_calls_sem:
...
Simple, right? :) Now, each outgoing call (e.g., publish
) blocks until we can acquire the semaphore (self._outgoing_calls_sem
).
We construct the semaphore in Client.__init__
:
self._outgoing_calls_sem = asyncio.Semaphore(max_concurrent_outgoing_calls)
where max_concurrent_outgoing_calls
is a new keyword argument for Client.__init__
.
Of course, we need to change the other outgoing calls (subscribe
and unsubscribe
) accordingly. We could even wrap all of this in a decorator like so:
@outgoing_call
async def publish(self, ...) -> None:
...
@outgoing_call
async def subscribe(self, ...) -> int:
...
@outgoing_call
async def unsubscribe(self, ...) -> None:
...
I leave the implementation of that as an exercise for the user. :)
Likewise, we need to support the case that max_concurrent_outgoing_calls=None
to be backward-compatible.
@bachya Are you up for this? Let me know if you have any questions.
from aiomqtt.
Related Issues (20)
- Impossible to reuse client in case of connection timeout HOT 7
- Typing warnings in IDE on Client methods HOT 2
- AttributeError: module 'aiomqtt' has no attribute '__version__'
- session persistence with protocol version 5 HOT 1
- Pin Paho to <2.0 HOT 2
- Make aiomqtt compatible with paho v2 HOT 10
- Topic.matches should be used when doing == HOT 4
- How to get the IP of a failed connection attempt
- strange reconnection HOT 2
- MQTT broker reports a disconnection/reconnection... but no aiomqtt.MqttError is raised HOT 4
- Exceptions on __aenter__ not handled. HOT 2
- Reconnect Bug HOT 1
- Cannot instantiate a client due to internal mqtt problem HOT 2
- Issues with uvicorn on Windows 10 HOT 5
- [BUG] Cannot install version 2.0.0 with paho-mqtt 2.0.0 HOT 2
- [BUG] 127.0.0.1/localhost Work Incorrectly HOT 3
- Do not send messages for a long time mqtt automatically disconnects HOT 1
- Can't read incoming messages in pytests HOT 8
- No convenient way to get message without getting locked into a for loop HOT 2
- Can aiomqtt queue has a ring buffer optionοΌ for high frequense in-comming messages? HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aiomqtt.