Giter Site home page Giter Site logo

AIS decoding problem about avnav HOT 41 CLOSED

quantenschaum avatar quantenschaum commented on June 27, 2024
AIS decoding problem

from avnav.

Comments (41)

wellenvogel avatar wellenvogel commented on June 27, 2024

Could you provide a log of test data with the problem?
The AIS fetch interval of the UI can be adapted in the settings (default: 30s).

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Thanks for the quick reply. What do you mean by log? The input data stream or the NmeaLogger?

  • Settings.UpdateTimes.AIS=5000ms that was the actual default
  • data.log acquired with nc localhost 6000 >data.log in parallel to running AvNav, each transmission starts at the RMC sentence, one pack of data each second
  • 2024-02-13.nmea.log from AvNav, missing sentences due the filtering
  • avnav.log
  • nothing on the js console of the browser (brave/chrome)

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Side note: When switching to SK as input it is not sufficient to disable the SocketReader and to enable the SignalKHandler. It won't pick up the ais data. I have to restart AvNav to get the data in via SK.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

The logs are what I need ..
Btw: you can get complete NMEA logs from AvNav by emptying the filter and setting the time to 0.

For the switching problem:
Would be helpful to get some more info - like setting before/after and exact observations + log.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

I found one error in my code, the padding of the type 5 messages was wrong, so there were some extra trailing zero bits. The SK decoder didn't care for that and AvNav did also decode some of these messages correctly and displayed the vessel's names. Now with this fixed, all ais target show their names, but still not all of the targets are displayed, some are still missing.

Only 8 of 10 ais targets are displayed.

image

2024-02-14.nmea.log

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

If I omit the type 5 messages and only send type 1, then all targets are displayed (w/o static info of course). So, I don't think that the type 1 messages are malformed. There are missing target when type 1 and type 5 messages are submitted together.

all of the 10 targets show up

image

2024-02-14.nmea.no-type5.log

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

OpenCPN also handles the input as expected and shows all of the 10 targets.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Which targets are missing seems to depend on the order of the AIS NMEA sentences.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

For generating my test data I switched to pyAIS, which is excellent for this purpose.

The problem persists, some targets are missing. I don't think it is related to the input data.

I also observed the following things.

  • When stopping the input data stream, the AIS targets remain on the display, seemingly forever, even after reloading the page. They should be marked as lost and removed after configurable intervals. (as in opencpn)
  • When the stream is started again, AvNav does sometimes not pick up the new data or with a big delay. (opencpn picks it up immediately)
  • virtual and out-of-position AtoNs are not visually marked as such (opencpn does)
  • When the intervals between ais data transmissions is set to 60s, even more targets are missing. (works with opencpn)

Seems there are really some issues with AIS processing.

image

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

For the removal of AIS data:
Refer to https://www.wellenvogel.net/software/avnav/docs/hints/configfile.html?lang=en, aisExpiryTime. Default 1200 seconds.
For the other parts I will have a look.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Thanks! But these settings cannot be set in the webinterface? - Yes it can, in config section. good to know.

It works, I just didn't wait the 20 minutes.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

from the log

2024-02-18 00:35:16,075-305118-305147-Feeder-ERROR-decoder lost 10 records
2024-02-18 00:35:16,080-305118-305147-Feeder-ERROR-decoder lost 15 records
2024-02-18 00:35:16,086-305118-305147-Feeder-ERROR-decoder lost 5 records
2024-02-18 00:35:26,089-305118-305147-Feeder-ERROR-decoder lost 25 records
2024-02-18 00:35:26,097-305118-305147-Feeder-ERROR-decoder lost 15 records
2024-02-18 00:35:26,099-305118-305147-Feeder-ERROR-decoder lost 5 records

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Is it possible to get print()s to appear on stdout?

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Increasing feeder fetchFromHistory maxEntries=100 solves the issue.

  • all targets are displayed
  • changes are picked up immediately

Perfect! 😄

Looks like a rate limiter to me, so that nmea sentences are discarded if they come in too fast to prevent avnav from getting stalled and lagging behind. only the newest data in processed. right?

Maybe it would be better to use a token bucket for rate limiting to allow for bursts instead of this hard seemingly pretty low limit of 10 entries?

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

I poked around in the code with the debugger. Some thoughts on what I found.

  • When updating/merging ais data (type 1+5) one should simply overwrite old with new values, but the timestamp should be bound to the position /lat/lon). Why? Because the timestamp/age is used for computing the DR position 👻 of the target. If the record gets updated by a type 5 message, the position is unchanged, but the 👻 jumps back. position data is what is important
  • dynamic ais data contain a timestamp field (seconds of transmission), the timestamp of the record in avnav should used this
  • Converting the ais raw data to real values in avnav_util should not added field, that are not present in the raw data (like position). Then the merging also gets much simpler, just dict.update().
  • Malformed data should be explicitly marked (set to None), so that these data fields are invalidated during update

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

Is it possible to get print()s to appear on stdout?
You would need to change avnav_server.py , line 65.
Looks like a rate limiter to me, so that nmea sentences are discarded if they come in too fast to prevent avnav from getting stalled and lagging behind. only the newest data in processed. right? Maybe it would be better to use a token bucket for rate limiting to allow for bursts instead of this hard seemingly pretty low limit of 10 entries?
At the end this is some fragile balance there... If you send in data at rates higher then what the decoder can handle you will have to drop them some how (or queue back at the input - with rather unpredictable results like messages being delayed for minutes or more...). The internal queue by default is 300 entries. So you could increase the maxEntries up to this value without problems. But at the end you still cannot be sure that the system will be able to handle data send in with an unlimited rate.
I guess other systems like SK or OpenCPN will simple "generate back pressure" - i.e. do not read the data until they have been able to process them.
So what we could do to increase the bucket size is really to set the maxEntries to the queue size (but this increases the probability of delayed messages). This would especially get interesting for all writers that send out from the queue - if their partners generate back pressure they would potentially send out rather old message. Maybe this would mean to additionally add some age handling to avoid picking up messages being too old.
Finally I guess your testing environment should not send at higher rates then what you could expect in reality - we have some limits by the VHF protocols...
When updating/merging ais data (type 1+5) one should simply overwrite old with new values, but the timestamp should be bound to the position /lat/lon). Why? Because the timestamp/age is used for computing the DR position 👻 of the target. If the record gets updated by a type 5 message, the position is unchanged, but the 👻 jumps back. position data is what is important
I guess this is a valid point. The timestamp was updated to extend the life time - but your point is more important.

dynamic ais data contain a timestamp field (seconds of transmission), the timestamp of the record in avnav should used this
This is a rather complex field. Finally you cannot be sure that the system time of our AvNav server is perfectly in sync with the sending device (and that the sending device has a valid time). So this would need some handling any way. Furthermore I have to change the timestamp handling to deal with situations when the system time is modified any way. So for all period measures I will switch to the steady timers that are independent of system time.
And AIS data from SignalK is a story of its own. Potentially they use the SK server timestamp.
So for now I feel it's more save to stay with the current meaning of "arrival time at AvNav".

Converting the ais raw data to real values in avnav_util should not added field, that are not present in the raw data (like position). Then the merging also gets much simpler, just dict.update(). Malformed data should be explicitly marked (set to None), so that these data fields are invalidated during update
Valid point. But I still would limit the updates from 5/24 to the allowed fields to prevent overwrites of the important information by bugs in the messages.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

This is a rather complex field. Finally you cannot be sure that the system time of our AvNav server is perfectly in sync with the sending device (and that the sending device has a valid time). So this would need some handling any way. Furthermore I have to change the timestamp handling to deal with situations when the system time is modified any way. So for all period measures I will switch to the steady timers that are independent of system time.

Indeed. I think there is no way to handle out of sync clocks without knowing they are out of sync. How could you know that? But I would assume that the clocks are in sync since AIS is linked to GPS which provides the best time signal ever. It's probably safe to assume that all AIS transmitters use GPS time. Now you local clock has to be in sync with GPS time as well. avnav could maintain it's own clock internally (w/o modifying system time) in sync with GPS time. (This would also allow to speed up the clock in simulations for testing.)

Anyways, are measure taken to sync the clocks of the avnav server and the client code in the browsers?

And AIS data from SignalK is a story of its own. Potentially they use the SK server timestamp.

That is true. If there is no timestamp in the in the SK data, than you can only use arrival time in avnav.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Is there a way to dynamically determine the max capacity of system and only drop messages if this limit is exceeded?

What happens if writers generate backpressure now?

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

My simulator is sending data every 1 second, this is not much. The point is, that AIS data cause bursts of data every 10s, but the average rate is about 7 sentences/s.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

What happens if writers generate backpressure now?
Exactly what you see from the feeder.
They use the same method to fetch from the queue.
Is there a way to dynamically determine the max capacity of system and only drop messages if this limit is exceeded?
I guess there is no easy way to do that. It potentially heavily depends on the overall system load, memory utilization (i.e. wio) and a lot of other factors.
Therefore I guess the current solution is some compromise. I think the best change could be to allow for larger chunks (maybe even increase the queue size) and add some arrival time stamp + some max delay with dropping when exceeded.
Typically I would assume a delay of a couple of seconds (...3?) would be ok for a max delay.
For the decoder itself (or maybe for other queue users) things could be improved by filtering on priority (i.e. start dropping lower prio stuff like AIS :-( earlier then position).

It's probably safe to assume that all AIS transmitters use GPS time. Now you local clock has to be in sync with GPS time as well. avnav could maintain it's own clock internally (w/o modifying system time) in sync with GPS time. (This would also allow to speed up the clock in simulations for testing.)
Not sure on that. The whole thing is really a bit tricky. If you are "offline" you most probably want to sync the system time with the received GPS data. But if you are "online" - maybe at home or in a harbour (potentially GPS even switched off) you need to sync with an NTP time source to be able to update your software.
This is what AvNav does (when running with the raspberry support).
In an ideal world everything would be in sync - but reality things can differ.
It's getting even worse when we think about testing (simulated or recorded data).
And letting AvNav internally have a different time then the system would be possible to a certain degree - but most probably rather confusing for the users.
My feeling therefore really is that relying on the arrival time is not that bad. But we could add a switch to change this - why not.
On Android things are even getting worse btw...
Anyways, are measure taken to sync the clocks of the avnav server and the client code in the browsers?
Not really. But all the times that are shown are based on the server times.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Nice. - The more you think about it, the more interesting and complicated it gets, as always.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

I'm just thinking out loud here...

  • the feeder is effectively a ring buffer for the incoming nmea messages limited to 300
  • each messages gets a unique sequence id
  • to keep track of which messages have been processed, the sequence counter is used
  • currently only the 10 most recent packets are fetched from the buffer, the rest is discarded

Image the following

  • the buffer is empty, all handlers are idle, there is no (back)pressure in the system (like right after startup of avnav)
  • now 50 messages are pushed into the buffer
  • 10 messages are fetched from the buffer for processing
  • 40 messages are discarded

👉 This is what causes this issue. IMHO this does not make sense to me because the system was idle, there was no pressure, it was not at its limit. There is no reason to discard any messages. - Currently it is assumed that messages trickle in one by one, not in bursts. But is this assumption justified? Hard to tell without a warning being displayed to the user. So, no one will notice without crawling through the logs and who does?

It should work somehow like this

  • a chunk containing the 10 oldest but unprocessed messages is fetched (FIFO) for processing
  • the remaining 40 messages remain in the buffer (seq+=10)
  • more messages may be pushed into the buffer in the meanwhile
  • the next chunk is fetched for processing

Now, of course it may happen that messages are added to buffer faster than the handler can process them causing a stalled system and delayed processing of the data. This must be prevented by discarding older or less important unprocessed messages. A warning should be displayed to tell the user about this. But this should only happen if the system hits its limit, not immediately on a burst which the system can handle.

Who to do this?

tbc

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

What happens if writers generate backpressure now?

Exactly what you see from the feeder. They use the same method to fetch from the queue.

Actually no, messages are discard even there is no backpressure. There is nothing in the implementation to communicate that backpressure upstream.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024
  • One could limit the rate at which messages are fed into the buffer using a token bucket to allow for bursts. The size and rate of the buffer have to set to reasonable values.
  • Or the rate limiting happens for each handler individually since each handler has its own processing speed.
  • Additionally the buffer itself limits the burst size by its capacity. If 500 messages arrive in burst, only 300 remain in the buffer, but this is ok if the buffer is big enough.

Who to detect that a handler is not able to keep up with rate at wich messages arrive in the buffer?

  • If the handler is able to keep up, it will "empty" the buffer eventually, fetch will block and/or return an empty chunk.
  • If a burst of data arrives, the pipeline may remain filled for a certain time
  • but must be emptied eventually within a certain time window, to ensure that the handler is able to process the data faster than it arrives

I think in python this could be implemented nicely as a generator function yielding new messages and doing all of the sequence counter handling internally, such that the handler just gets the messages. I will try to build something...

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

The concept now is rather simple:
Each queu user can decide about the chunk size that it allows.
This CAN mean that it will process older messages if the chunk size is large.
And as said - if it's external partner does not consume fast enough it will just skip as the feeder does.
And as I already wrote:
If you want to prevent messages being too old the only chance is to go with an enqueue timestamp and drop messages being too old.
I would not go for separate limiters at the readers - this will just make things more complex but I would not see any benefit.
And as I wrote - I would avoid backpressure at the reader (filler) side as you are getting out of control for the message ages.
At the end the problem is at the fetch side.
If some "fetcher" is unable to keep up it needs a strategy.
Until now there have been no real issues in the field - but it's always nice to improve....
So I will go for an implementation with an enque time stamp, max chunk being equal to queue size by default and an algorithm to drop messages being to old.
Good chance to also make progress for #250.
And the enqueue stamp can also become the time stamp in the store - more accurate.
Most probably it makes sense to have a default timeout and give the fetcher a chance to have a different one.
But I would like to keep the fetcher API as it is ( although I agree it could also be more elegant) - finally it's also visible at the plugin APi.
And I guess the same implementation will make sense for the android part.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Hmmm maybe, have a look at my code when it's ready and discuss again. I don't think it makes sense to discard messages right away if a burst occurs, the messages should only be discarded if they cannot be processed fast enough. You don't need timestamps on each messages for this, just monitor the number of unprocessed messages.

But putting timestamps on the messages when put into the buffer would be good anyways because these are the actual arrival timestamps. Now the timestamp is assigned too late during processing.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

Hmmm maybe, have a look at my code when it's ready and discuss again. I don't think it makes sense to discard messages right away if a burst occurs, the messages should only be discarded if they cannot be processed fast enough. You don't need timestamps on each messages for this, just monitor the number of unprocessed messages.
Not what I proposed...
Messages will simply go into the queue and when fetching they will be dropped if too old.
I don't think that there is any other real chance. The number of messages does not really help. There are many fetchers and each of them will most probably work on a different rate - depending potentially on the external partner.
So some will potentially be able to process e.g. 300 messages within 3 seconds - others will not.
With the proposed handling will have an algorithm that can handle bursts (how ever you name it - basically a bucket) - and on the other hand fulfills the requirement of limited message age.
And before going into coding I would like to have the clear concept.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Wait for the code, please, it shows the concept and is actually simple. All messages go into the buffer. If there are too many, it may overflow and old messages are lost, that's how it is and prevents huge bursts to kill the system. Now I want to drop messages per handler as currently implemented, but monitor the size of each handlers own pipeline and discard messages when the handler is not able to empty its own pipeline within a given time. This allows bursts to be handled with out dropping messages directly but also ensures that messages are processed within time X or dropped.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

Basically not really different except that you need a separate pipeline per handler.
But the enqueue time is nice any way as this would be the more appropriate time for the store and the validity period.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

There is already a pipeline for each handler, each handler tracks its own sequence number, which is a pointer into the ring buffer, so this effectively is one pipeline per handler. There is no need to copy the messages into a separate list for each handler, the sequence number contains this information, but the handler should not bother with this counter, that could be encapsulated elsewhere.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

sure.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Have a look, please: get_messages()

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

Hi Adam,
always appreciate your input + nice ideas.
This helps a lot.
But please accept that I will finally decide how I will implement certain parts - at the end I have to feel confident with the implementation. And if something goes wrong people will ask me - so I have to be sure what is in there.
Small remarks to your code:
(1) it does not consider times spent outside get_messages. So if the user would spend a reasonable amount of time outside - message could start aging
(2) This code is no solution for the current users - it would require all code to be changed to use this new method.
This could at some point make the code more complex because you would need to move the code to a more event driven handling (the yield). No problem for the decoder but more difficult for all the writers. They have to maintain their sending site (connection handling, errors,...). And you should not spent a time > 1s in any method to be able to handle a stop of the handler thread correctly.
(3) Plugins would have to be rewritten to utilize this new method. This is a no go for me. I would like to keep the plugin api stable at this point.
(4) assertions are not used in the code until now - would first require a clear concept. And all errors should be handled in a way that allows the server to continue (i.e. dedicated recovery from the error). This is the basic concept everywhere in AvNav (although potentially not always completely tested...)
So what I will do:
Will take our discussion as input and go for a compatible implementation using enqueue time stamps.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Of course, I do not want and cannot force you to implement anything. How could I? I would like to convince you of my solution and/or learn in the discussion why it is bad. And even if my code ends up in the trash, I learnt something from it, this why I am digging into it. It's interesting do think about and play with that stuff.

I am really impressed with what you've built here so far, basically as one-man-show. I recently discovered avnav and really like it. I would be happy, if I could contribute to the project. It's very interesting that people often are quite reluctant to accept changes. I can understand that to a certain extent and there are things that they know, which I don't, that are important to consider. This is where I am happy to learn something. But often their arguments are not really convincing me. Having a second avnav in a fork that diverges more and more doesn't make any sense and is just confusing, so it would be very nice to get this merged somehow.

I would like to answer to your points.

  1. It does consider the time spent outside, that was the primary idea behind implementing it as a generator function.
  2. Well, in every framework you will reach a point where you have to change the API, introduce a new method, deprecate and eventually remove the old one and to adapt the code to use the new API. There are 3 places in the code where fetchFromHistory() is called AFAICS. So what? And both methods could live there in parallel for compatibility for a while. The generator actually makes the code much simpler, because the handler just gets chunks of data with no need to track the sequence number and caring for the internals of the implementation. IMHO This is a quite pythonic way to do it, nothing special or event-driven about it. Why would the writer not be able to use the generator? The time spent waiting for new data in the generator can be adjusted in the timeout. This is not different from waitTime in fetchFromHistory(). I don't see the point here.
  3. Can understand that, but I would do it (see 2) to resolve the issue. How many plugins are we talking about, 5 or 5000? Maybe there is also a way to keep the old API and change it internally?
  4. Assertions should be used generously, especially if there are no unit tests. They are really helpful during development and live testing and reveal bugs, and you can disable them. They must not trigger in production or there is a bug. Do you have a concept for exceptions in general? Do you have a concept for everything? Is that written down? We know how that works here, we have an idea of how it should work, but not a well defined concept, be honest. What about assertions in external libraries? And there is already a catch-all in feeder.run().
  5. Using timestamps on the incoming messages is good in general, but may not solve this problem. Discarding all messages older that x seconds won't remove the congestion. You just remove old messages but the queue can still contain more young messages than the handler can process. IMHO you need to flush the queue completely if it could not be processed entirely within a certain time, otherwise the processing will always lag behind. The handler needs to consume the messages faster than they arrive. This will lead to an empty pipeline eventually. Now we just require that this happens within a specified time, otherwise the queue is flushed. The timestamps of the messages do not actually matter here. This is what I have implemented.

I somehow can understand your feeling about it, maybe I am pushing this a bit too much and are writing too much, I know, don't bother. But what you say does not really convince me of my proposal being a bad solution or even a no-go. It does actually work.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

I'm always interested in discussions - can only make things better.
And as said - I'm happy if someone is digging in that deep.
But there are some - how to say? - rules - that I try to follow. Not written down - until now there is no need for them as it's just for me.
A couple of them I already mentioned:
(1) API stability. I know - everything can be changed. On the other hand it's typically also not that much of an issue to keep APIs - if you simple decide that this has some priority for you. Unfortunately a lot of open source projects do exactly suffer from that. At least for me it's always a pain if after some time APIs have changed completely and you have to redo a lot of things. So I try to avoid this if possible. If the only drawback is that the code is not that elegant - I can easily live with it.
(2) Error recovery:
Most users of AvNav have no real idea about the client/server stuff and what happens behind. Therefore one major focus is to keep the server up and running all the time. And for sure I cannot guarantee for all used libs (try to keep the number low any way) - but until now the results are good.
And a simply error handling rule:
If you feel that an error could arise at a certain point - better think about a recovery strategy from it. At the end I do not blame assert at all - it's just about having a clear strategy on how to use it - and what to do in production if the situation arises.
And something like a final catch all is already important - but this does most probably not repair the problem so that you easily can get stuck in an error loop.

I now have a first implementation that only has a couple of smaller changes.
And I found that it is still completely ok to remain at small chunks but simply to utilize the whole history.
So the flow is basically simple:
As long as my next requested sequence is still in the queue I fetch from there but skip the outdated messages. And only if I'm too slow for longer times my requested sequence will (potentially) not be in the queue any more.
So if we get a burst we can easily store at least queue size messages - and if the handlers are fast enough to handle them within the allowed interval everything is fine.
If not - older ones will simply be skipped and will age out.
Will just push my implementation (not fully tested yet). And there are some tunables at the feeder - so you can easily increase the queue len for larger bursts.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024
  1. How much code out there does depends on avnav? I think this is manageable.
  2. Error recovery remains as an issue, independent of the use of assertions. except: pass w/o logging is a nightmare to debug.
  3. Your implementation fits your bill, that's fine. It resolves the issue somehow, but it does not solve point 5 of what I wrote. What happens is that you feed messages with constant delay of up to maxAge. Do we really want that or should the queue better be flushed? And you have to do a lot of looping in the fetch call. My proposal computes the indices directly.
  4. Code elegance is always a compromise, of course, but I personally prefer the generator because it's straight forward to use. Returning the sequence and the data as tuple and having to pass the sequence in again on next call is, frankly spoken, quite ugly. This what are generators have been invented for in Python.

I would recommend to flush the queue completely.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

Lengthy discussion...
Ok, you did not consider my concerns/priorities - but this is up to you.
The remaining real question:
Should the queue completely being flushed when not being able to keep up?
Which situation would be improved?
Just only the situation where a consumer constantly is too slow.
With my aporoach it will process delayed messages and at some point in time it will skip.
With your approach from time to time it would pick up an arbitrary amount of relatively new messages (they could have been sitting in the queue for long times already if the consumer spends a reasonable time between two calls of get_messages).
And as the consumer is too slow this scenario will repeat forever ( in both cases).
Both are not really great - but from my pov the real control over message ages is a plus.

And for the code elegance:
You could easily wrap the existing dequeue api with a small class providing the generator pattern or a more simple dequeue API.
And from the usability:
Where would you handle stop thread checks, error handling/ opening/closing of your external connection?

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

Your solution is ok, it solves the current issue and is better than before. If a consumer is constantly too slow, it will not work well anyway, that's true. In either case messages have to be dropped. So, you're definitely loosing some data.

Concerning your priorities of a stable API, I would prioritize differently, right. Maybe it's better not to change it. But I personally would change it since there are not hundreds of projects out there depending on it. This is your decision, of course. But using assertions really helps getting your code right with live tests (like index computation). They must not trigger in production anyway. If an assertion would trigger and you just remove it, because you don't want to have assertions in the code, then there will be some other exception or unwanted behavior that is hard to find. I don't get the point of not wanting assertions in the code, they reveal bugs, you fix them and on it goes. And error handling has to done anyways, with or without assertions.

With your approach from time to time it would pick up an arbitrary amount of relatively new messages (they could have been sitting in the queue for long times already if the consumer spends a reasonable time between two calls of get_messages).

This is actually not true! - Seems to me, you are not really familiar with Python's yield? Is there a misunderstanding? get_messages is only called once in the consumer (except if the consumer restarts). If the queue was not emptied within discard_time (not from time to time) it returns the chunk_size (not arbitrary) most recent messages (thus younger than discard_time if new messages arrived otherwise the input stream has stopped anyway) and leaves an empty queue. The idea is to remove the congestion entirely and start over. If it was a temporary problem, it then is resolved and "fresh" data is fed without delay.

Your approach is somewhat similar now, but it only discards old messages (from time to time), it does not force the client to get the newest messages and leave an empty queue. After messages have been discarded, your implementation returns the next chunk of messages not older than specified, but these could actually "be sitting in the queue" for up to maxAge especially "if the consumer spends a reasonable time between two calls of fetchFromHistory".

When using avnav as NMEA multiplexer congestion might occur (temporarily) at the outputs (serial or network). If that happens, your implementation could cause constantly delayed messages to be send to the outputs. If backpressure builts up, too old messages are discarded, but still messages are sent with a delay of up to maxAge. I as a user would like to get messages with as little delay as possible from the output. So, some backpressure is tolerated, if it gets over threshold, the queue is flushed and undelayed messages are sent. You have too discard data in either case.

but from my pov the real control over message ages is a plus.

Having an arrival timestamp on the messages is good, if it is used downstream in the pipeline. The congestion problem could be solved without it.

Where would you handle stop thread checks, error handling/ opening/closing of your external connection?

Exactly as in your existing code. Have you seen what I wrote? (get_messages yields (empty) chunks at least every timeout)

All you do is to replace

seq=0
while not should_stop():
  seq,chunk = fetchFromHistory(seq)
  for m in chunk: process(m)

with (getting rid of seq)

for chunk in get_messages():
  if should_stop(): break
  for m in chunk: process(m)

Now try-except blocks can be added to catch and log exceptions and keep it going, as it is currently done.

The way the ringbuffer is implemented using a single int to track the message id is really clever. That does only work because Python as an infinitely sized int, it does not roll over. If it did, that might be a problem if you relied on the absolute value of the sequence counter, but differences would always be correct even across a rollover. But since multiple clients are reading from it and need to track their own progress, the sequence is returned with the data and passed in again. It works, but is confusing when reading the code and looks to me a bit like desperate workaround that now is "carved into stone" because you don't want to change the API.

There is actually an easy way to make fetchFromHistory use get_messages internally.

from avnav.

wellenvogel avatar wellenvogel commented on June 27, 2024

Thanks for the explanations.
I guess I understand what you are doing and how yield works.
Finally it's simply turning the current interface upside down. Maybe some people (like you) will find this potentially easier to use. Others (like me) would like to stick with the current dequeue style API.
And I agree - one can be converted into the other.
I guess it does not really make sense to further discuss until we find new points.
Btw for the rollover:
Even if we assume 10000 messages per second a 64Bit integer would need a large amount of years to roll over...

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

It's ok, you want to keep it your way.

turning the current interface upside down

This is quite an exaggeration looking at the example in my previous post.

But nice, that avnav now can cope with burst of input data.

from avnav.

quantenschaum avatar quantenschaum commented on June 27, 2024

I think this is fixed now with 58e850b.

from avnav.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.