Giter Site home page Giter Site logo

stiffstream / sobjectizer Goto Github PK

View Code? Open in Web Editor NEW
460.0 20.0 45.0 14.6 MB

An implementation of Actor, Publish-Subscribe, and CSP models in one rather small C++ framework. With performance, quality, and stability proved by years in the production.

Home Page: https://stiffstream.com/en/products/sobjectizer.html

License: Other

Makefile 0.08% CMake 2.46% Ruby 5.48% Batchfile 0.01% C++ 91.98%
cpp cplusplus cplusplus-17 actor-model actors actor-framework actor-library multithreading concurrency concurrent-programming sobjectizer message-passing pubsub publish-subscribe csp communicating-sequential-processes agents actor thread

sobjectizer's Issues

Can't test an agent that creates a child coop

Hello, I am using sobjectizer in my work project and trying to write some tests for a simple agent using testing_env_t testing environment. I already tested another agent without issues but when I try to test an agent that creates child cooperations I got an error.

Some environment info:
sobjectizer 5.7.2 installed from conan
gcc 11

Simple test agent:

class ChildAgent : public so_5::agent_t {
public:
  explicit ChildAgent(context_t ctx) : so_5::agent_t(std::move(ctx)) {}
};

class TestAgent : public so_5::agent_t {
public:
  explicit TestAgent(context_t ctx) : so_5::agent_t(std::move(ctx)) {}

  void so_evt_start() override {
    so_5::introduce_child_coop(*this, [](so_5::coop_t &coop) {
      coop.make_agent<ChildAgent>();
    });
  }
};

Test code looks like this:

TEST_CASE("Simple agent test") {
  so_5::experimental::testing::testing_env_t testing;
  testing.environment().introduce_coop([](so_5::coop_t &coop) {
    coop.make_agent<TestAgent>();
  });

  // After executing the scenario an exception is thrown
  testing.scenario().run_for(std::chrono::milliseconds(100));
}

When I run this test I get an exception from sobjectizer:

SObjectizer event exception caught: (.../sobjectizer/dev/so_5/impl/coop_repository_basis.cpp:108): error(28) a new coop can't be registered when shutdown is in progress; cooperation: {coop:id=2}

When I remove so_5::introduce_child_coop from TestAgent::so_evt_start then all is ok.

Am I doing something wrong? How could I test an agent with a child cooperation created inside it?

Get dispatcher by name from an environment

Hello.

Is it possible to get a dispatcher from an SObjectizer environment with name (like it can be done already with mbox)? Seems like it's not available for now.

Why it can be useful? The same reason as for mboxes - if I want to create an agent on some specific dispatcher and don't want to pass through many layers dispatcher binder.

What do you think about such feature?

Thank you.

A possibility to subscribe a event-handler to a base type of a message

NOTE. This issue is just a reminder that SObjectizer needs a solution for use-cases like described in #24
I don't know how and when this will be implemented (and can it be implemented at all), but the reminder will help to form scope for a new SObjectizer's release when appropriate resources will be available.

The task is to allow writing something like that:

class image_base : public so_5::message_t {...};
class image_vendor_A : public image_base {...};
class image_vendor_B : public image_base {...};
class image_vendor_C : public image_base {...};

// The first handler.
void first_handler::so_define_agent() {
  so_subscribe_self()
    // Handles only one type of images.
    .event([this](mhood_t<image_vendor_A> cmd) {...})
    // All other messages are just logged.
    .event([this](mhood_t<image_base> cmd) { log_image(*cmd); });
}
...
// Another handler.
void second_handler::so_define_agent() {
  so_subscribe_self()
    // Handles two types.
    .event([this](mhood_t<image_vendor_B> cmd) {...})
    .event([this](mhood_t<image_vendor_C> cmd) {...})
    // All other messages are just redirected as is.
    .event([this](mhood_t<image_base> cmd) {
      so_5::send(new_dest, cmd);
    });
}

// Yet another handler.
void third_handler::so_define_agent() {
  // Redirects all messages to another destination.
  so_subscribe_self().event([this](mhood_t<image_base> cmd) {
    so_5::send(new_dest, cmd);
  }
}

Additional requirement. It'll be great to have that functionality for handling messages from mchains too:

so_5::receive(from(mchain).handle_all(),
  [](so_5::mhood_t<image_vendor_A> cmd) {...},
  [](so_5::mhood_t<image_vendor_C> cmd) {...},
  [new_dest](so_5::mhood_t<image_base> cmd) { so_5::send(new_dest, cmd); });

Template error on redirecting signals

Hi everyone,
just to report that redirecting a signal gives a compilation error on Visual Studio 2019 with C++17 enabled (tested on version 16.10.1):

class redirector : public so_5::agent_t {
	...
	void on_some_immutable_signal(mhood_t<some_signal> cmd) {
		so_5::send(another_mbox, cmd);
		...
	}
};

This causes the compiler to complain on line 297 of send_functions.hpp:

error C2059: syntax error: 'template'

I'm using the latest version of SObjectizer (5.7.2.5).

I have not investigated the issue but it seems a bug of the compiler. Is that something happening also on others?

Any way to wait until an agent is ready to receive messages?

Hi,
I have a very simple question. Suppose I am testing an application where I have some agents registered to a global environment.
I would like to be sure that such agents are ready to get messages from channels they have subscribed to. This way I can generate fake messages and make expectations.

At the moment, I just register them and wait - say - 10ms to be on the safe side. However, this is not portable and can fail sporadically on CI builds. In general, I don't like "sleep" of any sort.

Is it possible to wait for such a "state" without adding custom logic?
For example, agents have that ensure_binding_finished. Is this what I am looking for?

For the moment, I can't use the experimental "testing" capabilities in SObjectizer since I have some stable code that can't be easily accommodated to the "scenario" semantics.

Thanks!

Best practices to get a timeout?

Hi everyone,
my adventures with SObjectizer continue and I am very happy with it.
I have a question that I suppose is related to agent states, but I need your expert advise.

Suppose I have an agent that subscribes to a mailbox of images (e.g. OpenCV Mat) and shows them in real-time with OpenCV's imshow (it's just for troubleshooting and not intended for production):

//                                                   v-- this is by value for simplicity
so_subscribe(channel).event([](cv::Mat image) {
     imshow("Live Show", image);
     cv::waitKey(1);
});

The problem with this code is that not receiving images for a some time causes the window to hang (the behavior is intended because of cv::waitKey). In another implementation (not based on SObjectizer) I have a way to get into another lambda when the channel does not get data for a specified amount of time.

Imagine something like this:

so_subscribe(channel)
           .event([](cv::Mat image) {
                imshow("Live Show", image);
                cv::waitKey(1);
           })
           .on_timeout(200ms, []{
                cv::destroyAllWindows();
           });

What is the best way to get something like this in SObjectizer?

Many thanks!

Marco

[idea] A possibility to set a custom mbox to be returned via agent_t::so_direct_mbox()?

This is just an idea. It's fixed here to be investigated and, maybe, implemented in some future version of SObjectizer.

Every agent in SObjectizer has its MPSC mbox called direct mbox. This mbox is automatically created by SOEnv for every new agent. That mbox is available via agent_t::so_direct_mbox() method.

The type of that mbox is hidden from a user. A user only knows that it's an MPSC mbox.

Sometimes it can be necessary to hide the agent direct mbox and to expose some proxy instead (for example, a proxy that filters or reorders messages of some types), but it's impossible to do because anyone who has a reference to an agent can call so_direct_mbox, and so_direct_mbox isn't a virtual method.

I think that SObjectizer should have a way to specify a custom MPSC mbox to be used as the agent's direct mbox. But I don't know how to do that at the moment. It requires further thinking and prototyping.

Either passing channel or channel name

Hi,
quick design & style-related question: I have a bunch of agents that publish some signals to some channels (every agent to its own channel):

agent1 -> channel1
agent2 -> channel2
agent3 -> channel3
...

Such agents are dynamic during the lifetime of the application. Signals are sent sporadically.

Now, I need a global agent - let's call it Monitor - to subscribe to all of them, even if they are not known in advance. My idea is to simply have a known channel the Monitor subscribes to - let's call it NotificationChannel. Every time a new agent comes out, it sends a notification to NotificationChannel, this way Monitor can subscribe to the specific channel.

Monitor subscribes to NotificationChannel
...
agent1 sends a notification containing channel1 to NotificationChannel
Monitor subscribes to channel1
...

Two questions:

  • (design-related) what do you think of this approach?
  • (style-related) what would you pass to that NotificationChannel, a reference to the channel itself (e.g. mbox_t) or just the channel name?

Many thanks!

Compilation error with Clang 10

Hi!
SObjectizer 5.7.0 cannot be compiled with Clang 10 (seems like the reason is in the updated libstdc++). I've got the following errors:

In file included from /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.cpp:13:
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:427:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:458:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:491:37: error: no member named 'string_view' in namespace 'std'
                return make_dispatcher( env, std::string_view{}, thread_count );
                                             ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:524:10: error: no member named 'string_view' in namespace 'std'
                                std::string_view{},
                                ~~~~~^
In file included from /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.cpp:12:
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:412:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:443:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:476:37: error: no member named 'string_view' in namespace 'std'
                return make_dispatcher( env, std::string_view{}, thread_count );
                                             ~~~~~^
3 errors generated.

As far as I see you just forgot to include #include <string_view> in pub.hpp

Hope it'll help.

Method unsubscribe_event_handlers for mbox has to be noexcept

The method abstract_message_box_t::unsubscribe_event_handlers is not noexcept now (in SO-5.5/5.6/5.7). But this method is often called in noexcept contexts (like destructors or catch blocks). So it seems that unsubscribe_event_handlers has to be noexcept since 5.8.0.

User payload for event handler subscription

SObjectizer allows to prioritize agents, but often it's not enough. There are two another prioritization levels. First one is level of messages. Dispatcher can define specific class (derived from message_t) to inherit prioritized messages from and resolve message priority by this class. This is great, but may be too expensive sometimes. Another one is level of event handlers. It is powerful enough and cheap, also it is best solution for my case. Unfortunately there is no way to use that level at this moment. I had to patch SObjectizer to pass priority through subscription_bind_t::event and store it in execution_hint_t, but i wish SObjectizer had native way to do it.

Optional argument of subscription_bind_t::event for user payload looks like suitable way that doesn't break backward compatibility. Also it may allow other possibilities for custom dispatchers.

[idea] Support of delivery filters for MPSC mboxes?

The current version of SObjectizer prohibits the usage of delivery filters with MPSC mboxes. The root of this ban is somewhere in the past. MPSC mboxes were created as an optimization trick to speed up message exchange in 1-to-1 scenarios. Mutable messages and the role of MPSC mboxes for mutable messages were discovered much later.

There is no big sense in support for delivery filters for standard MPSC mboxes (available via agent_t::so_direct_mbox()). But there can be some forms of custom MPSC mboxes (like unique-subscribers mboxes from upcoming so5extra-1.5), and the support for delivery filters for those custom mboxes could be crucial.

The problem is that the current version of SObjectizer doesn't provide a way to specify a delivery filter for a mutable message. Message mutability isn't supported by agent_t::so_set_delivery_filter methods.

[idea] Passing of some message_sink_t to subscribe_event_handler instead of agent_t

NOTE. This idea is for the next big release of SObjectizer (probably breaking compatibility with 5.7-branch). It's unknown at the moment when work on that big release could be started.

In the current version of SObjectizer, only agents can make subscriptions to messages passed to message boxes. It's reflected in the format of abstract_message_box_t::subscribe_event_handler method:

//! Add the message handler.
virtual void
subscribe_event_handler(
	//! Message type.
	const std::type_index & type_index,
	//! Optional message limit for that message type.
	const message_limit::control_block_t * limit,
	//! Agent-subscriber.
	agent_t & subscriber ) = 0;

This prevents some interesting tricks like the chaining of mboxes. For example, there could be a standard MPMC mbox to that some messages are sent. And an instance of so_5::extra::mboxes::round_robin mbox can be subscribed to that MPMC mbox as a subscriber. And one of subscribers to round_robin mbox can be a so_5::extra::mboxes::retained_mbox. And so on.

It seems that there could be an abstract interface named message_sink_t that can be used as a destination for messages. Class agent_t will implement that interface. Message mboxes and message chains can implement that interface too.

It would make possible to subscribe a mbox to another mbox. And that opens a possibility to create chains of mboxes like the one described above.

I think it can make SObjectizer yet more flexible. But this idea is at the very early stage and needs additional thinking and experimenting.

so_set_delivery_filter and lvalue references

It seems that this code can't be compiled with the current version of SObjectizer:

void so_define_agent() override {
  auto filter = [](const some_message &) { return true; };
  so_set_delivery_filter(mbox, filter);
}

because so_set_delivery_filter expects a rvalue reference to lambda-function.

Maybe it's necessary to fix so_set_delivery_filter to allow acceptance of lvalue references (const and non-const)?

Mbox with contention?

Hi,
I hope it's the right place for asking questions.
Please, forgive me if this question is silly, I am learning the basics of SObjectizer.

I have understood that a Mbox enables to dispatch messages to several consumers. As explained here, every consumer receives a copy of each message.

Keeping N consumers, I am wondering if it's possible to have, instead, that every message is received and processed exactly by one consumer, the "first that is free" (clearly, this is a simplification). This way we have contention on the Mbox. It's the classical pattern "N jobs, M workers".

How can I properly model this pattern with SObjectizer?

Many thanks,

Marco

Use different message types or different mboxes?

I'm quite impressed by SObjectizer and I want to convert my application to use it. However, at the moment I'm having problems to understand the design I should follow when defining messages and mboxes:

When should I use different message types and when should I use the same types on different mailboxes?

My application controls a number of identical devices (the devices are not known until start-up). Most of the messages to and from the devices are of the same basic type (a triplet of real numbers), although they mean different things. My idea is to have several actors which listen to messages from all devices and do some state estimation and a few which consume both estimations and observations and act a controllers.

In my message broker (MQTT) I have one topic per device (e.g. "device1/measured", "device1/setpoint", "device1/status") because that's the way MQTT is supposed to be used, but I don't believe it is a good idea to have one mbox per device in the application.

I guess my options are:

  1. One generic "board" mbox, with different message subclasses to identify the topic of the message (eg, define "Measured", "Setpoint", "Status", and derive both "Measured" and "Setpoint" from the same class).
  2. One mbox per topic, use the same class for messages whose data is essentially the same (e.g. have a "Measured" and "Setpoint" mboxes, and publish the same type of objects to both).
  3. A combination of (1) and (2): several mboxes and different message classes too.

Regards,

Juan.

Synchronous shutdown

I experiment with switching to sobjectizer in an existing project. By now, the objects in this project get created and destroyed dynamically and they have internal threading (event loops) that get shutdown and waited for in the destructor.

I removed those internal threads and replaced them by companion actors that do the event handling. That works, but I want to make sure that an actor has actually shut down, before finishing the destructor. From what I see, this is currently not happening in a thread safe manor...

Example code

class MyClassCompanion : public so_5::agent_t {
    MyClass& _parent;
    // to things with parent on events
};


class MyClass {
    MyClassCompanion* _actor;
    MyClass() {
                so_5::introduce_child_coop(env.get_root_coop(), [&](so_5::coop_t& coop) {
                    _actor = coop.make_agent<MyClassCompanion>(*this);
                });
    }

    ~MyClass() {
          _actor->so_deregister_agent_coop_normally();
    }
};

When I run valgrind on my solution I see, that sometimes the actor is still accessing its _parent, although the destructor had finished. That leads me to the conclusion that so_deregister_agent_coop_normally is not synchronous, that after it returns something is still running...

  1. is so_deregister_agent_coop_normally thread-safe and synchronous?
  2. is the way of spawning agents in the constructor correct? is that thread-safe?

Timers problem when Boost is also used

Hi!

Thank you for very cool library!

I have tried to use your library with Boost, and I am confused due to the build is failed even I only add Boost library without its real using.

I started from building my application without Boost, and the build was successful, then I added boost/1.73.0 in requirements to my conanfile.txt, and builds started failing. The problem was raised on the linking stage:

CMakeFiles/SObjectizer-error.dir/src/main.cpp.o: In function `so_5::details::event_subscription_helpers::ensure_handler_can_be_used_with_mbox(so_5::details::msg_type_and_handler_pair_t const&, so_5::intrusive_ptr_t<so_5::abstract_message_box_t> const&)':
main.cpp:(.text._ZN4so_57details26event_subscription_helpers36ensure_handler_can_be_used_with_mboxERKNS0_27msg_type_and_handler_pair_tERKNS_15intrusive_ptr_tINS_22abstract_message_box_tEEE[_ZN4so_57details26event_subscription_helpers36ensure_handler_can_be_used_with_mboxERKNS0_27msg_type_and_handler_pair_tERKNS_15intrusive_ptr_tINS_22abstract_message_box_tEEE] 0xc0): undefined reference to `so_5::exception_t::raise(char const*, unsigned int, std::string const&, int)'
CMakeFiles/SObjectizer-error.dir/src/main.cpp.o: In function `Agent* so_5::details::event_subscription_helpers::get_actual_agent_pointer<Agent>(so_5::agent_t&)':
main.cpp:(.text._ZN4so_57details26event_subscription_helpers24get_actual_agent_pointerI5AgentEEPT_RNS_7agent_tE[_ZN4so_57details26event_subscription_helpers24get_actual_agent_pointerI5AgentEEPT_RNS_7agent_tE] 0xbd): undefined reference to `so_5::exception_t::raise(char const*, unsigned int, std::string const&, int)'
CMakeFiles/SObjectizer-error.dir/src/main.cpp.o: In function `so_5::message_payload_type_impl<Updater, false>::extract_payload_ptr(so_5::intrusive_ptr_t<so_5::message_t>&)':
main.cpp:(.text._ZN4so_525message_payload_type_implI7UpdaterLb0EE19extract_payload_ptrERNS_15intrusive_ptr_tINS_9message_tEEE[_ZN4so_525message_payload_type_implI7UpdaterLb0EE19extract_payload_ptrERNS_15intrusive_ptr_tINS_9message_tEEE] 0x98): undefined reference to `so_5::exception_t::raise(char const*, unsigned int, std::string const&, int)'

I have also tried to use boost/1.69.0 and boost/1.71.0, but it did not have any effects: the error messages are, and they are similar.

I created simple repository with my code where the error is: https://github.com/FulgurIgor/SObjectizer-error. It has Travis-CI build, where the error is demonstrated:

Source code: https://github.com/FulgurIgor/SObjectizer-error/blob/master/src/main.cpp

Tested systems:

  • Fedora 32, GCC 10.1, SObjectizer/5.7.1, Boost/1.73.0 (and also Boost/1.71.0 and Boost/1.69.0).
  • Ubuntu Bionic, GCC 7.4, SObjectizer/5.7.1, Boost/1.73.0

All builds use Conan and CMake.

[idea] Registration of user-provided mboxes as named ones

The current version of environment_t::create_mbox(name) always returns a mbox created by SObjectizer. It seems that we can provide a possibility to user to register own mbox as a named one.

Something like:

so_5::mbox_t custom_mbox = so_5::extra::mboxes::unique_subscribers::make(env);
env.register_named_mbox("my_mbox", custom_mbox);
...
auto mbox = env.create_mbox("my_mbox"); // the custom_mbox will be returned.

The changes to SObjectizer can look like:

namespace so_5
{

namespace named_mbox
{

enum class not_unique_name_reaction_t
{
  throw_exception,
  dont_throw_exception
};

constexpr auto throw_if_not_unique = not_unique_name_reaction_t::throw_exception;
constexpr auto nothrow_if_not_unique = not_unique_name_reaction_t::dont_throw_exception;

} /* namespace named_mbox */

class environment_t
{
public:
    ...
    // Returns false if name isn't unique and reaction is dont_throw_if_not_unique.
    bool
    register_named_mbox(nonempty_name_t mbox_name,
      named_mbox::not_unique_name_reaction_t reaction = named_mbox::throw_if_not_unique);
    ...
};

} /* namespace so_5 */

Use so_default_state as a parent state for an another state

Hi!

Is it possible with SObjectizer to use default agent state (which is returned with so_default_state) as a parent state for another state? I am trying to do something like this:

state_t NotStarted{initial_substate_of(so_default_state()), "NotStarted"};

Unfortunately initial_substate_of takes a non-const reference but so_default_state returns const reference, so it doesn't compile.

Is there any workaround (except introducing own "default" state and making it as a parent for another events)? Is it a limitation by design?

Thank you.

Backwards compatible way of closing mchains

Hi!
I have just seen the new release and I have a small suggestion for the wiki.

Since abstract_message_chain_t::close(close_mode) is now deprecated, I would explicitly mention on the release notes (wiki) how to replace this call (and related ones) with the new one in order to get the very same behavior.

For example:

close_retain_content(chain);

is now equivalent to:

close_retain_content(so_5::exceptions_enabled, chain);

What do you think?

Generic message limits for all message types

Hello.

I've read about message limits https://github.com/Stiffstream/sobjectizer/wiki/SO-5.7-InDepth-Message-Limits and a post on Habrahabr about collector-performer pattern.

But I am a little bit wondered, that message limits can be defined only for specific messages. Seems like it's not possible (at least in easy way) define message limits for all message types in a mbox.

Is there any way for doing such thing in SObjectizer? For now I see only one option: inherit all message types from some class generic_message_t and then define limit_and_X<generic_message_t> (I hope it works).

Default dispatcher binder of child coops

There is a way to set dispatcher binder for coop. This binder will be used for all agents of that coop by default instead of the default binder from environment infrastructure. Is there a way to force child coops of that coop use this binder by default too?

Any benchmark program available?

Hi everyone,
this might be a weird question, please let me know if you need more context/information.

I am wondering if any of you has crafted a sort of SObjectizer benchmark program to get some "generic" insights on the performance and throughput of SObjectizer on different machines and targets.

Alternatively, do you have any general tips to write such a program myself? Which kind of things would you measure? Can I use any performance tests you have released already?

Note: I already have performance tests covering specific applications, and I also produce profile reports. Instead, I need a quick way to test and get expectations on SObjectizer's capabilities on many machines (possibly built with proprietary hardware) subjected to different kind of load.

Many thanks!

Marco

Handling any message type

Hi,
I have a very simple question: is it possible to make a subscription handler that will be called regardless of the message type?

Something like this:

so_subscribe(board_).event([](any_message_t m){
   // do something
});

// ...
send<int>(board_, 5); // handler called
send<MyType>(board_, MyType{}); // handler called
// ...

What's the recommended practice here?

Thanks.

Add custom handler for events for which there is no any handler in current state

Hi!

I am trying to use FSM functionality, which is already integrated into SObjectizer (btw thatnk you a lot for such architecture design - that's very useful in an agent development).

I understand, how can I switch between states, how can I add different event handlers for different states.

In my situation I should add custom reaction (more precisely - sent a message to another agent, which is used for error logging) for events, which is sent to an agent and were not handled since for them were not found any handler (e.g. an event A is not allowed in a state ST and I want to notify about such situation my error-handling agent).

Is there any way for doing such stuff? E.g. in Boost.SML it can be done. See https://boost-experimental.github.io/sml/examples.html#error-handling - work with unexpected_event (btw if you see any features, which are missed in SObjectizer implementation of FSM and there are in Boost.SML, would be fine to add them into SObjectizer as well :) )

I've already checked an example about no_handler_message_tracing: https://github.com/Stiffstream/sobjectizer/blob/master/dev/sample/so_5/nohandler_msg_tracing/main.cpp
But I don't think that it is usable in my situation.

Thank you.

How to safely catch and forward a OS signal (SIGINT, SIGHUP, etc)

I need to be able to stop the environment when the application receives a SIGINT/SIGQUIT and send a message to a component to reload the configuration when SIGHUP is received.

Doing this seems to work:

  auto stop_app = [](int signal) -> void {
      main_env->stop();
  };
...
std::signal(SIGTERM, stop_app);

However, reading the code in environment_t::stop() I suspect it is not safe to call from the context of an async signal. The same goes for sending a message to a mbox/mchain, AFAIU there is the possibility of a deadlock if the signal just happens to be delivered while the message box mutex is being held.

How can OS signals be handled safely. I can only think of a few options:

  • Have a dedicated thread to handle signals.
  • Make a custom implementation of mchain that uses a self-pipe.
  • Periodically poll signals through a signalfd.
  • Wait on a signalfd from another thread and send a message though a mchain.
  • Make a custom timer thread implementation that replaces the condition variable by an eventfd or self-pipe, so that it can simultaneously wait on other FDs or receive data from a signal handler.

A possibility to drop all subscriptions with total deactivation of an agent?

NOTE: it just an idea inspired by #28 . It may not get any implementation. I fix it in issues to return to it when there will be a possibility.

Sometimes it's necessary to deactivate some agent and keep it in that deactivated state until the coop of the agent will be deregistered.

A developer can do that by using an additional state without any subscriptions:

class demo : public so_5::agent_t
{
  state_t st_deactivated{this};
  ...
  void on_some_event() {
    if(has_to_be_deactivated())
      this >>= st_deactivated; // Agent won't react to new messages anymore.
    ...
  }
};

But this approach has a drawback: anyone can still send messages to that agent and some resources will be consumed. It's because all subscriptions of the agent remain valid and messages sent to the deactivated agent will be stored to the agent's event-queue, extracted, checked for the presence of an event-handler in the current state, and only then those messages will be ignored.

It seems that SObjectizer can do better. There is already a special invisible state awaiting_deregistration_state that is used when an agent throws an exception, but the exception reaction is set to deregister_coop_on_exception. So we can provide a new agent_t's method so_deactivate_agent that does the following actions:

  • drops all the subscriptions to avoid resources consumptions if someone tries to send a message to the deactivated agent;
  • switches the agent to awaiting_deregistration_state. In that state, the agent will wait for the deregistration of its coop.

Or we can just add a new method so_drop_all_subscriptions to remove all subscriptions regardless of the states. Including deadletter_handlers.

noexcept for closing mchains

At the moment abstract_message_chain_t::close() method and close_drop_content()/close_retain_content() methods is not marked as noexcept.

It seems that mchain closing shouldn't be a throwing operation, because it's often called from destructors.

This moment should be explored with additional care and those methods/functions have to be marked as noexcept if is possible from the implementation's point of view.

Drop oldest agent message limit?

Hi everybody,
speaking about message limits, I know that limit_then_drop makes the agent ignore new messages. I am wondering if it's possible to drop oldest messages somehow (as it happens with message chains' overflow policy so_5::mchain_props::overflow_reaction_t::remove_oldest).

Many thanks!

Marco

handle_n for mbox?

Hi,
what is your recommended practice to wait for getting exactly N messages from a mailbox? Imagine something similar to handle_n for message chains. I think I have read something about on the wiki but I cannot remember where...

Many thanks.

Marco

state_t's on_enter/on_exit should accept noexcept methods

For example:

class my_agent : public so_5::agent_t {
  state_t st_normal{ this };
  ...
  void on_enter_st_normal() noexcept {...}
  ...
  void so_define_agent() override {
    st_normal.on_enter(&my_agent::on_enter_st_normal); // (1)
    ...
  }
};

Now the compilation fails at point (1) because in C++17 noexcept is a part of the method signature and that case is not handled in SObjectizer's is_agent_method_pointer metafunction.

Classical endless loop?

Hi,
this is probably a simple question but I am wondering which are your recommended answers.

I need to encapsulate a classical endless loop into a SObjectizer agent. Something like this:

class Worker
{
public:
   Worker()
   {
     m_thread = [this]{
         while (!m_stop)
         {
            // ... do something
         }
      };
   }

   void StopAndWait()
   {
      m_stop = true;
      m_thread.join();
   }

private:
   thread m_thread;
   atomic<bool> m_stop = false;
};

Also, I would like to remove "StopAndWait" and let SObjectizer handle shutdown automatically as it happens normally.

I guess this can be modeled with states. Do you recommend any common implementations?

Many thanks.

Comparison operators for coop_handle_t

The type coop_handle_t plays a role similar to the smart-pointer but doesn't have comparison operators. It could lead to unexpected result in the following case:

so_5::coop_handle_t h1 = ...;
so_5::coop_handle_t h2 = ...;
if(h1 == h2) { // OOPS!
   ...
}

because the results of conversion from coop_handle_t to bool will be compared in if(h1 == h2).

So it's can be desirable to have comparison operators for coop_handle_t.

The application consumes almost 100% CPU (1 thread) all the time.

Hi,
I am using SObjectizer in our projects. I have some observation on the CPU utilisation. The following is a simple code with one thread handling messages on 2 channels.
// Channel 1 -> process a request and post a message to channel 2
// Channel 2 -> process a request and post a message to channel 1
And the loop continues.

Observation: The application consumes almost 100% CPU (1 thread) all the time.

Query: Please help make it CPU efficient / event based? I admit I am still learning the SObjectizer library.

Sample Code:

#include <thread>
#include <memory>
#include <random>

// SObjectizer is one of a few cross-platform and OpenSource "actor frameworks" for C++. 
// But SObjectizer supports not only Actor Model, but also Publish-Subscribe Model and CSP-like channels. 
// ver 5.7
#include <so_5/all.hpp>

// Message structure
struct poll_device_events_request_t { };

void thread_func(so_5::mchain_t primary_channel, so_5::mchain_t secondary_channel)
{
  // The stuff necessary for random pause generation.
  std::random_device rd;
	std::mt19937 generator{rd()};
	std::uniform_int_distribution<> pause_generator{5000, 10000}; // Delay between 5 - 10 seconds.
  
  // This flag will be set to 'true' when some of channels will be closed.
	bool stop = false;
  auto prepared = so_5::prepare_select(
    so_5::from_all().handle_all()
      // If some channel gets closed we should set out 'stop' flag.
      .on_close([&stop](const auto &) { stop = true; })
      // A predicate for stopping select() function.
      .stop_on([&stop]{ return stop; }),

    // Read and handle poll_device_events_request messages from channel_.
    so_5::receive_case(primary_channel, 
      [&](poll_device_events_request_t){
        
        std::cout << "Received message on primary channel..." << std::endl;
        // Do some work.
        
        // Generate a random pause.
        const std::chrono::milliseconds pause{pause_generator(generator)};
        
        // Send a message to secondary channel - delayed message.
        so_5::send_delayed<poll_device_events_request_t>(secondary_channel, pause);
      }
    ),
    so_5::receive_case(secondary_channel, 
      [&](poll_device_events_request_t){
        std::cout << "Received message on secondary channel..." << std::endl;
        // Do some work.
        
        // Generate a random pause for processing of that request.
        const std::chrono::milliseconds pause{pause_generator(generator)};
        
        // Delay processing of that request by using delayed message.
        so_5::send_delayed<poll_device_events_request_t>(primary_channel, pause);
      }
    )
  );
  
  while (!stop) {
    so_5::select( prepared );
  }
}

// Let's create two channels - 
// Channel 1 -> process a request and post a message to channel 2
// Channel 2 -> process a request and post a message to channel 1
  
int main() {
  std::shared_ptr<so_5::wrapped_env_t> s_node_env_obj_ = std::make_shared<so_5::wrapped_env_t>();
  
  auto primary_channel_ = so_5::create_mchain(*s_node_env_obj_);
  auto primary_channel_closer = so_5::auto_close_drop_content(primary_channel_);
	
  auto secondary_channel_ = so_5::create_mchain(*s_node_env_obj_);
  auto secondary_channel_closer = so_5::auto_close_drop_content(secondary_channel_);
  
  // Thread objects for workers.
	std::thread nthread_ {thread_func, primary_channel_, secondary_channel_};
	auto nthread_joiner = so_5::auto_join( nthread_ );

  // Initiate the first message.
  so_5::send<poll_device_events_request_t>(primary_channel_);

  nthread_.join();
  return 0;
}

Agent shutdown

Hi,
I have a simple question: is it possible to stop an agent from inside of that agent? In other words, can an agent signal to the environment that it wants to be stopped/terminated/finished in a normal way?

so_deregister_agent_coop_normally() and so_deregister_agent_coop() are close to what I need, however such functions deregister the entire cooperation. My current approach is based on parent-child relations:

  • create a "root" cooperation
  • for each agent, create a child cooperation and register the agent at that
  • when an agent needs to be deregistered, call so_deregister_agent_coop_normally() (this will result in shutting down that agent only)

Is that a good approach?

Just to give you an example of why I need this: I have an agent that communicates with a child process. When the child process sends back a particular data or it just terminates, that agent must be stopped.

Many thanks.

PS I hope to see you in 1-hour at the meetup :)

[Best practice] How to unsubscribe from all remaining messages on shutdown?

Hi,
suppose I have an agent that can be slow at processing incoming messages. Under some circumstances, I don't want to wait for it to complete when shutdown is sent from the environment, instead I want to terminate it immediately.

The agent is subscribed to a SPMC message box that might contain several messages already. I just want to ignore them all when shutdown is requested.

I had a look at stop guards but I am not sure this is the way to go (it seems to me that they solve exactly the opposite problem).

What's the best practice to get such a behavior?

Let's start from a skeleton of that agent:

class SlowAgent : public so_5::agent_t
{
public:
	SlowAgent(context_t c, so_5::mbox_t input)
		: agent_t(std::move(c)), m_input(std::move(input))
	{
	}

protected:
	void so_define_agent() override
	{
		so_subscribe_self().event([](int i) {
			// just fake some slow processing...
			std::cout << "doing " << i << "\n";
			Sleep(1000);
			std::cout << "done\n";
		});
	}
	
private:
	so_5::mbox_t m_input;
};

Thanks!

Issue in README.md

Hello. In your README.md you have next lines:

Since v.5.5.24 SObjectizer provides sobjectizer-config.cmake files.
These files are automatically installed into <target>/lib/cmake/sobjectizer
subfolder. It allows to use SObjectizer via CMake's find_package command.

You have <target> that doesn't render on GitHub as in Markdown something between <> is a quick link...

Add example for the usage of SObjectizer's layers.

There is no example that shows the usage of SObjectizer's layers in the standard SObjectizer's examples. It will be good to have at least one such example (code snippet from #6 can be used for a very simple one).

Не удается включить work_thread_activity_tracking через params_tuner в wrapped_env_t

Здравствуйте, при попытке включить work_thread_activity_tracking:
so_5::wrapped_env_t env{..., [](auto& p){ p.turn_work_activity_tracking_on();});
auto result = env->environment().work_thread_actovity_tracking();
Ожидается, что result == on, но получается: result == unspecified.
Возможно значение флага теряется внутри конструктора копирования класса environment_params_t(environment.cpp):
... , m_work_thread_activity_tracking(work_thread_activity_tracking:: unspecified) ...
Это не выглядит правильным поведением.
Версия: 5.7.0 но мне кажется, что будет воспроизводиться и в последней.
Заранее спасибо.

Allow the creation of intrusive_ptr_t from a pointer to agent

Class so_5::agent_t has atomic_refcounted_t as private base class. It means that it isn't possible to create a shared pointer (in the form of so_5::intrusive_ptr_t<T>) for an agent. But sometimes it can be useful for the cases when the lifetime of an agent should be extended for some time after the deregistration of the agent from SObjectizer. For example, if a pointer to the agent is passed to some callback (like callbacks in Asio).

If this feature is really useful it can be implemented in different ways:

  • atomic_refcounted_t can be a public base class for agent_t;
  • some helper function template (like make_agent_ref<T>) + intrusive_ptr_t<T> as friend for agent_t.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.