Giter Site home page Giter Site logo

Comments (18)

eao197 avatar eao197 commented on May 17, 2024

Hi!

Delayed messages are used for those things:

struct no_image_timeout final : public so_5::signal_t {};
...
so_subscribe(channel).event([](cv::Mat image) {...});
so_subscribe_self().event([](mhood_t<no_image_timeout>) { cv::destroyAllWindows(); });
...
// Start timer.
so_5::send_delayed<no_image_timeout>(&this, 200ms);

Please note also that states can have timeouts and there is no need to use delayed messages for controlling time spent in a particular state:

class images_reciver : public so_5::agent_t {
  state_t st_wait_next_image{this};
  state_t st_no_images{this};
...
  void so_define_agent() override {
    st_wait_next_image
      .event(channel, [](cv::Mat image) {...})
      .time_limit(200ms, st_no_images);
    st_no_images
      .on_enter([]{ cv::destroyAllWindows(); });
    this >= st_wait_next_image; // Internal timer will be fired automatically.
  }
};

from sobjectizer.

ilpropheta avatar ilpropheta commented on May 17, 2024

Hi, thanks for your reply.
Actually, this is not exactly what I need: I get images continuously and I want to detect when I don't get anymore for some time (e.g. 200ms). In that case I perform an action (e.g. destroy all windows) and get back to a listening state (in case the stream gets resumed and I need to get back drawing).
However, I don't want to destroy the windows while the images are arriving: this would be both inefficient and counterintuitive.

I think the solution based on delayed messages needs some extra handling: anytime an image is received, we send a delayed message. When the delayed message handler is entered, we check if last time we got an image was more than 200ms ago.
This should work but I think it's convoluted.

On the other hand, the solution with states is very close and it would work only if the internal timer was reset every time the image handler was entered. In other words:

void so_define_agent() override
{
        st_wait_next_image
                .event(m_source, [](cv::Mat image) { imshow(image); }) // to work, time_limit should be reset here
                .time_limit(200ms, st_no_images);
        st_no_images
            .on_enter([] { cv::destroyAllWindows(); })
            .time_limit(1ms, st_wait_next_image); // this has been added to resume listening in case the stream gets resumed
		
        this >>= st_wait_next_image;
}

I tried this one and it seems to work:

void so_define_agent() override
{
        st_wait_next_image
                .event(m_source, [this](cv::Mat image) { 
                    imshow(image); 
                    st_wait_next_image.time_limit(200ms, st_no_images); 
                });
        st_no_images
            .on_enter([this] { 
               cv::destroyAllWindows(); 
               st_wait_next_image.drop_time_limit(); // don't know why this is needed
            })
            .time_limit(1ms, st_wait_next_image);
	
        this >>= st_wait_next_image;
}

I don't understand why a call to st_wait_next_image.drop_time_limit() inside the second event handler is needed. If I omit that, the destroy handler is called multiple times in a row.

Also, I don't know the impact on performance when calling time_limit repeatedly from the first handler. Actually, the doc says:

If S.time_limit() is called when S is active state then time_limit for that state will be reset and time for the state S will be counted from zero.

So I have supposed this is intended.

Also, I cannot judge if the solution overall is correct or I can expect some subtle edge cases. What's your feeling?

Another idea I had was based on message chains. Pseudo-code:

void so_evt_start() override
{
	bool closed = false;
	while (!closed)
	{
		auto res = so_5::receive(from(m_chain)
			.handle_all()
			.empty_timeout(200ms), [this](cv::Mat image) {
				imshow(image);
			});

		closed = res.status() == so_5::mchain_props::extraction_status_t::chain_closed;
		// if here, either the chain has been closed or the timeout has been fired: just destroy the windows
                cv::destroyAllWindows();
	}
}

However, I would be happier with message boxes.

Please let me know your thoughts!

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

Hi!

st_wait_next_image.drop_time_limit(); // don't know why this is needed

I'm surprised why this call is necessary. Let me investigate this case.

from sobjectizer.

ilpropheta avatar ilpropheta commented on May 17, 2024

Hi @eao197 , I think I got it.

When the second handler is executed, it moves the agent state to st_wait_next_image after 1ms:

st_no_images
            .on_enter([this]
            {	        
                ...
                this->st_wait_next_image.drop_time_limit();
            })
           .time_limit(1ms, st_wait_next_image); // move back to listening

However, I think the agent is still configured to spend no more than 200ms in that state:

st_wait_next_image
            .event(m_source, [this](cv::Mat image) {
                    ....
                   st_wait_next_image.time_limit(200ms, st_no_images); // still affecting the state?
           });

Since in my experiments I was simulating a streaming pause, after moving back to st_wait_next_image the corresponding handler didn't get called and so no new call to st_wait_next_image.time_limit(200ms, st_no_images) was done. The timer is elapsing all the time, isn't it?

Is this a possible explanation?

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

If your test code looked like:

st_wait_next_image
  .event(m_source, [this](cv::Mat image) {
    ...
    std::this_thread::sleep_for(500ms);
    st_wait_next_image.time_limit(200ms, st_no_images);
  })

Then the following actions happened under the hood:

  • some internal message state_timed_out_1 was sent as a delayed message to the agent, the delay was 200ms;
  • this delayed message was fired by the timer when the agent was in sleep_for call, so the message was stored in the agent's queue;
  • subscription for state_timed_out_1 was destroyed during the call to st_wait_next_image.time_limit inside your event-handler. This call also sent a new delayed message state_timed_out_2 with the delay 200ms;
  • when your event-handler completed an instance of state_timed_out_1 was extracted from the queue and should be thrown out because the subscription for it was destroyed inside the time_limit call.

So there should be a place for several calls to st_no_images.on_enter in a row.

I'll continue the investigation.

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

I still don't understand why a separate call to st_wait_next_image.drop_time_limit was necessary. I even can't reproduce such behavior in my local tests.

But let me give some answers to your questions.

Historically, the simplest and obvious way to handle such scenarios was the usage of delayed messages but sent via send_periodic to get a time_id instance. Something like:

class image_receiver : public so_5::agent_t {
  struct no_images final : public so_5::signal_t {};

  so_5::timer_id_t m_no_images_timer;
  ...
  void so_define_agent() override {
    so_subscribe(m_source).event([this](cv::Mat image) {
      m_no_images_timer = so_5::send_periodic<no_images>(*this, 200ms, 0ms); // 0ms as period makes the message delayed.
      ... // Image processing.
  }
    so_subscribe_self().event([this](mhood_t<no_images>) {...});
  }

  void so_evt_start() override {
    m_no_images_timer = so_5::send_periodic<no_images>(*this, 200ms, 0ms); // 0ms as period makes the message delayed.
  }
};

This approach is the most efficient because no helper objects are created.

But this approach has a hidden underwater rock.

Let's assume there is a cv::Mat message in the queue. The agent gets it and resends a delayed message no_images then starts image processing. At that point in time, a new cv::Mat message is pushed into the agent's queue. So the agent works on the previous message and has a new message in the queue.

Let's assume that the processing of that message takes longer than 200ms. The timer sends an instance of no_images to the agent's queue.

So now we have cv::Mat and no_images in the queue.

The agent finishes the processing of the current message, extracts the next cv::Mat, and starts the processing of the extracted message. We want to reschedule the delayed no_images signal because we have a new image to process.

But no_images is already in our queue.

So when the agent completes the current processing it receives old no_images immediately.

And that isn't what we want in most cases.

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

There are several tricks for avoiding that problem.

The first one is used in the current implementation of time_limit for agent's states.

It's based on the usage of an additional mbox. A new mbox is created for every new call of send_delayed. And a new subscription is made for that mbox. Something like that:

auto tmp_mbox = so_environment().create_mbox();
so_subscribe(tmp_mbox).event([this](mhood_t<no_images>) {...});
auto timer_id = so_5::send_periodic<no_images>(tmp_mbox, 200ms, 0ms);

where tmp_mbox and timer_id is stored in an additional struct.

The timer cancelation is now more complex:

// Subscription must be destroyed.
so_drop_subscription<no_images>(tmp_mbox, current_state);
// Timer should be cancelled too.
timer_id.reset();

In that case, even if no_images is already in the queue, it will be ignored because it was sent to tmp_mbox and we've dropped the subscription for that mbox.

This approach is reliable but it requires the allocation of some additional data: temporary mbox, an object for holding the temporary mbox, and timer_id, the subscription.

Usually, it isn't a problem, but it is better to know about that price.

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

The second trick is implemented in so5extra's revocable_timer submodule.

It uses a special envelope with an atomic boolean flag inside. When a delayed message is revoked this flag is set. Then it is checked just before the invocation of the event handler. If the flag is set (it means that the message is revoked) then the message is ignored.

Such a trick allows revoking timers even if the timer has already fired the message and that message is waiting in the receiver's queue.

This approach requires just one additional allocation: for an envelope with a boolean flag inside. So it's more efficient than the usage of an additional mbox.

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

Since in my experiments I was simulating a streaming pause, after moving back to st_wait_next_image the corresponding handler didn't get called and so no new call to st_wait_next_image.time_limit(200ms, st_no_images) was done.

Could you show how you did such a simulation?

from sobjectizer.

ilpropheta avatar ilpropheta commented on May 17, 2024

Thanks for the details and for explaining other ways to do this. The solution with the extra call to st_wait_next_image.drop_time_limit seems to work as I expect but I am open to other approaches.

Simulating the delay is simply a sleep between sending some images to the message box. Images are pushed from the main thread for simplicity, whereas the receiver agent has a dedicated one. So this_thread::sleep_for is not called from the event handler.

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

Images are pushed from the main thread for simplicity, whereas the receiver agent has a dedicated one. So this_thread::sleep_for is not called from the event handler.

I wrote a small test program and can't reproduce that issue with additional call to drop_time_limit:

#include <so_5/all.hpp>

using namespace std::chrono_literals;

class a_test_t final : public so_5::agent_t
{
	state_t st_wait_next_image{ this, "wait_next_image" };
	state_t st_no_images{ this, "no_images" };

public:
	struct next_image final : public so_5::signal_t {};

	a_test_t( context_t ctx ) : so_5::agent_t{ std::move(ctx) } {}

	void so_define_agent() override
	{
		st_wait_next_image
			.on_enter( []() { std::cout << "entering st_wait_next_image" << std::endl; } )
			.event(
				[this](mhood_t<next_image>) {
					std::cout << "on next_image" << std::endl;
					st_wait_next_image.time_limit( 200ms, st_no_images );
				} )
			.time_limit( 200ms, st_no_images );

		st_no_images
			.on_enter( []() { std::cout << "no images." << std::endl; } )
			.time_limit( 1ms, st_wait_next_image );

		this >>= st_wait_next_image;
	}
};

int main()
{
	so_5::mbox_t test_mbox;
	so_5::wrapped_env_t sobj{
		[&]( so_5::environment_t & env ) {
			auto test_agent = env.make_agent< a_test_t >();
			test_mbox = test_agent->so_direct_mbox();
			env.register_agent_as_coop( std::move(test_agent) );
		},
		[]( so_5::environment_params_t & params ) {
			params.message_delivery_tracer( so_5::msg_tracing::std_cerr_tracer() );
		}
	};

	auto pause_then_send = [test_mbox]( auto pause ) {
		std::cout << "=" << pause.count() << "ms" << std::endl;
		std::this_thread::sleep_for( pause );
		so_5::send< a_test_t::next_image >( test_mbox );
	};

	const auto pauses = {
			100ms, 150ms,
			200ms, 250ms,
			300ms, 350ms,
			400ms, 450ms,
			500ms, 550ms,
			600ms, 650ms,
			700ms, 750ms
		};

	for(;;)
	{
		for( const auto p : pauses )
			pause_then_send( p );
	}

	return 0;
}

Can you run your code with message-delivery-tracing turned on? It will be very interesting to see messages like those:

[tid=139983363495680][mbox_id=9] deliver_message.push_to_queue [msg_type=N4so_57state_t12time_limit_t7timeoutE][signal][overlimit_deep=1][agent_ptr=0x7f5064001760]
[tid=139983371888384][agent_ptr=0x7f5064001760] demand_handler_on_message.find_handler [mbox_id=9][msg_type=N4so_57state_t12time_limit_t7timeoutE][signal][state=no_images][evt_handler=0x7f5064001a68]
[tid=139983371888384][agent_ptr=0x7f5064001760] state.leaving [state=no_images]
[tid=139983371888384][agent_ptr=0x7f5064001760] state.entering [state=wait_next_image]

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

The solution with the extra call to st_wait_next_image.drop_time_limit seems to work as I expect but I am open to other approaches.

Another simple way to handle delayed messages safely is the following:

class image_receiver : public so_5::agent_t {
  struct no_image_timeout {
    unsigned long long m_counter;
  };

  unsigned long long m_timeout_counter{}; // Will be incremented on every send.

  // Just a helper method.
  void prolonge_timeout() {
    so_5::send_delayed<no_image_timeout>(*this, ++m_timeout_counter);
  }
  ...
  void so_define_agent() override {
    so_subscribe(m_source).event([this](cv::Mat image) {
      ... // Actual handling.
      prolonge_timeout();
    });
    so_subscribe_self().event([this](no_image_timeout cmd) {
      if(m_timeout_counter != cmd.m_counter)
        // This is outdated message. Ignore it.
        return;
      ... // Handling the absence of image.
    });
  }
  void so_evt_start() override {
    prolonge_timeout();
    ...
  }
};

Also if you decided to use the approach with st_wait_next_image and st_no_images states then add an event-handler for a new image into st_no_images state. For example, that way:

st_wait_next_image
  .event(...)
  .event(...)
  .time_limit(...)
  ...;
st_no_images
  .on_enter(...)
  .time_limit(...)
  .transfer_to_state<cv::Mat>(st_wait_next_image);

from sobjectizer.

ilpropheta avatar ilpropheta commented on May 17, 2024

Hi,
I have wrapped up a small (and ugly) program to reproduce the issue (I have used an extra thread for producing "images"). Possibly it's my fault in doing something, let me know:

#include <so_5/all.hpp>
using namespace std;

class FakeImageProducer
{
public:
    explicit FakeImageProducer(so_5::mbox_t dst)
        : m_destination{ std::move(dst) }
    {
    }

    ~FakeImageProducer()
    {
        if (m_thread.joinable())
            m_thread.join();
    }

    void Play()
    {
        m_pause = false;
        m_thread = std::thread([&] {
            while (!m_pause)
            {
                std::this_thread::sleep_for(60ms);
                so_5::send<int>(m_destination, m_counter++);
            }
        });
    }

    void Pause()
    {
        m_pause = true;
    }

    void StopAndWait()
    {
        m_pause = true;
        m_thread.join();
    }

private:
    std::thread m_thread;
    so_5::mbox_t m_destination;
    std::atomic<bool> m_pause;
    int m_counter = 0;
};

class FakeImageReceiver : public so_5::agent_t
{
public:
    FakeImageReceiver(context_t ctx)
        : so_5::agent_t(ctx), m_source(ctx.env().create_mbox("main.images"))
    {}

    state_t st_wait_next_image{ this };
    state_t st_no_images{ this };

    void so_define_agent() override
    {
        st_wait_next_image
            .event(m_source, [this](int image) {
                  std::cout << "got an image " << image << "\n";
                  this->st_wait_next_image.time_limit(200ms, this->st_no_images);
            })
            .time_limit(200ms, st_no_images);
                st_no_images
                    .on_enter([this] {
                        std::cout << "no images arrived by 200ms...\n";
                        this->st_wait_next_image.drop_time_limit();  //// !!!!!! THIS ONE MAKES 

THE DIFFERENCE
                    })
                    .time_limit(1ms, st_wait_next_image);

                        this >>= st_wait_next_image; // Internal timer will be fired automatically.
    }
private:
    so_5::mbox_t m_source;
};

int main()
{
    so_5::wrapped_env_t sobj{ [](so_5::environment_t& env) {
        },
        [](so_5::environment_params_t& params) {
            params.message_delivery_tracer(so_5::msg_tracing::std_cout_tracer());
         } };
    auto& env = sobj.environment();
    auto coop = sobj.environment().make_coop(so_5::disp::active_obj::make_dispatcher(sobj.environment()).binder());

    auto fakeReceiver = coop->make_agent<FakeImageReceiver>();
    FakeImageProducer producer{ env.create_mbox("main.images") };

    env.register_coop(move(coop));

    producer.Play();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    producer.StopAndWait();
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "resuming\n";
    producer.Play();
    std::this_thread::sleep_for(std::chrono::seconds(2));
    producer.StopAndWait();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "ending...\n";
}

Attaching the tracing files:

Please let me know your thoughts.

I'll have a look at the other solutions you proposed shortly!

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

Ok, it seems I got the point.

Without an extra call to drop_time_limit the example works as it is expected:

  • no new images, timeout elapsed, and state is changed to st_no_images;
  • during the change from st_wait_new_image to st_no_images a new timer is fired. A very short one, just 1ms;
  • this short timeout elapsed and a new switch happens: this time from st_no_images to st_wait_new_image;
  • during the change a new timer is fired, this time for 200ms;
  • because there are no images, timeout elapsed, and a new switch happens: from st_wait_new_image to st_no_images;
  • and so on until there will be a new image.

That is why you see several prints:

no images arrived by 200ms...
no images arrived by 200ms...
no images arrived by 200ms...
no images arrived by 200ms...
no images arrived by 200ms...
no images arrived by 200ms...
resuming

It's just a ping-pong of switching states from st_wait_next_image and st_no_images.

But when you add an extra call to drop_time_limit you change the parameters for st_wait_next_image: there is no time limit now, it's deleted after the first switch to st_no_images. So, when the agent returns from st_no_images to st_wait_next_image no timers are fired: there is no time limit to take into account.

That is why there are no new changes from st_wait_next_image to st_no_images after the return to st_wait_next_image. And that is why you see only one print:

no images arrived by 200ms...
resuming

The agent will now wait for an incoming message forever. And you resume ping-pong between states only after receiving an incoming message (because you restore the time limit after that).

from sobjectizer.

ilpropheta avatar ilpropheta commented on May 17, 2024

Thanks for the investigation. Do you think this is safe/efficient or you see any major drawbacks? Consider that this is just a troubleshooting utility, not intended for running in production.

You mentioned that I should add a handler for a new image in st_no_images. Do you mean the following?

st_no_images
      .on_enter([this] {
           std::cout << "no images arrived by 200ms...\n";
           this->st_wait_next_image.drop_time_limit(); 
      })
      .event([](cv::Mat image) {
         imshow(image); // in case I get an image at this point, just draw it
      })
      .time_limit(1ms, st_wait_next_image);

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

You mentioned that I should add a handler for a new image in st_no_images. Do you mean the following?

Not exactly. It seems that you can't call imshow in st_no_images state because you've destroyed windows in on_enter handler. So your code should look something like this:

st_no_images
  .on_enter([this] { ... /* Destroy windows */ })
  .event([this](cv::Mat image) {
    ... // Create new windows.
    imshow(image);
    // Because an image received it's time to switch to st_wait_next_image.
    this >>= st_wait_next_images;
  }); // NOTE: there is no need to limit the time of waiting the first image anymore.

But there is a special marker transfer_to_state that allows to change the current state of an event and delegate the processing of an incoming message to the new state. So it allows to rewrite the code above that way:

st_wait_next_image
  .on_enter([this] { ... /* Create necessary windows */ })
  .event([this](cv::Mat image) {
    imshow(image);
  })
  .time_limit(200ms, st_no_images);

st_no_images
  .on_enter([this] { ... /* Destroy windows */ })
  .transfer_to_state<cv::Mat>(st_wait_next_image);

In that scenario, you will wait a new image no more than 200ms in st_wait_next_image state. With unlimited waiting in st_no_image state. If you are in st_no_images state and a new cv::Mat arrived you will be automatically switched to st_wait_next_image state.

from sobjectizer.

eao197 avatar eao197 commented on May 17, 2024

Do you think this is safe/efficient or you see any major drawbacks? Consider that this is just a troubleshooting utility, not intended for running in production.

It seems that the state-based approach is straightforward and understandable enough. I don't think that it'll have a significant performance penalty (this can be measured if needed). At least one drawback should be taken into account: on_enter/on_exit handlers can't throw, they should catch all exceptions. So it's hard to propagate an error from the on_enter/on_exit handlers. If this is not an issue then the state-base approach looks good.

from sobjectizer.

ilpropheta avatar ilpropheta commented on May 17, 2024

Great! The limitations are fine for me so I go with this implementation.
Thanks again for your help @eao197! I close the issue.

from sobjectizer.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.