Giter Site home page Giter Site logo

stiffstream / sobjectizer Goto Github PK

View Code? Open in Web Editor NEW
456.0 20.0 45.0 14.6 MB

An implementation of Actor, Publish-Subscribe, and CSP models in one rather small C++ framework. With performance, quality, and stability proved by years in the production.

Home Page: https://stiffstream.com/en/products/sobjectizer.html

License: Other

Makefile 0.08% CMake 2.46% Ruby 5.48% Batchfile 0.01% C++ 91.98%
cpp cplusplus cplusplus-17 actor-model actors actor-framework actor-library multithreading concurrency concurrent-programming

sobjectizer's Introduction

Created by gh-md-toc

What is SObjectizer?

SObjectizer is one of a few cross-platform and OpenSource "actor frameworks" for C++. But SObjectizer supports not only Actor Model, but also Publish-Subscribe Model and CSP-like channels. The goal of SObjectizer is significant simplification of development of concurrent and multithreaded applications in C++.

SObjectizer allows the creation of a concurrent app as a set of agent-objects which interact with each other through asynchronous messages. It handles message dispatching and provides a working context for message processing. And allows to tune those things by supplying various ready-to-use dispatchers.

What distinguishes SObjectizer?

Maturity. SObjectizer is based on ideas that have been put forward in 1995-2000. And SObjectizer itself is being developed since 2002. SObjectizer-5 is continuously evolved since 2010.

Stability. From the very beginning SObjectizer was used for business-critical applications, and some of them are still being used in production. Breaking changes in SObjectizer are rare and we approach to them very carefully.

Cross-platform. SObjectizer runs on Windows, Linux, FreeBSD, macOS and Android.

Easy-to-use. SObjectizer provides easy to understand and easy to use API with a lot of examples in the SObjectizer's distributive and a plenty of information in the project's Wiki.

Free. SObjectizer is distributed under BSD-3-CLAUSE license, so it can be used in development of proprietary commercial software for free.

SObjectizer is not like TBB, taskflow or HPX

SObjectizer is often compared with tools like Intel Threading Building Blocks, taskflow, HPX, and similar to them. Such comparison is just useless.

All those tools are intended to be used for solving tasks from Parallel Computing area: they allow to reduce the computational time by utilizing several CPU cores. For example, you can reencode your video file from one format to another within one hour on one CPU core, by it takes only 15 minutes on four cores. That is the main goal of Parallel Computing.

SObjectizer is intended for a slightly different area: Concurrent Computing. The main goal of SObjectizer is the simplification of doing many different tasks at once. Sometimes there is no need to use more than just one CPU core for that. But if there are several CPU cores, then SObjectizer makes the handling of those tasks and the interaction between them much easier.

The tricky part is the fact that Parallel- and Concurrent Computing use the same concurrency mechanisms and primitives (like threads, mutexes, atomics, and so on) under the hood. But from the high-level point of view Parallel- and Concurrent Computing are used for very different tasks.

As examples of applications that were or could be implemented on top of SObjectizer, we can list multithreaded proxy-server, automatic control system, MQ-broker, database server, and so on.

Show me the code!

HelloWorld example

This is a classical example "Hello, World" expressed by using SObjectizer's agents:

#include <so_5/all.hpp>

class hello_actor final : public so_5::agent_t {
public:
   using so_5::agent_t::agent_t;

   void so_evt_start() override {
      std::cout << "Hello, World!" << std::endl;
      // Finish work of example.
      so_deregister_agent_coop_normally();
   }
};

int main() {
   // Launch SObjectizer.
   so_5::launch([](so_5::environment_t & env) {
         // Add a hello_actor instance in a new cooperation.
         env.register_agent_as_coop( env.make_agent<hello_actor>() );
      });

   return 0;
}

Ping-Pong example

Let's look at more interesting example with two agents and message exchange between them. It is another famous example for actor frameworks, "Ping-Pong":

#include <so_5/all.hpp>

struct ping {
   int counter_;
};

struct pong {
   int counter_;
};

class pinger final : public so_5::agent_t {
   so_5::mbox_t ponger_;

   void on_pong(mhood_t<pong> cmd) {
      if(cmd->counter_ > 0)
         so_5::send<ping>(ponger_, cmd->counter_ - 1);
      else
         so_deregister_agent_coop_normally();
   }

public:
   pinger(context_t ctx) : so_5::agent_t{std::move(ctx)} {}

   void set_ponger(const so_5::mbox_t mbox) { ponger_ = mbox; }

   void so_define_agent() override {
      so_subscribe_self().event( &pinger::on_pong );
   }

   void so_evt_start() override {
      so_5::send<ping>(ponger_, 1000);
   }
};

class ponger final : public so_5::agent_t {
   const so_5::mbox_t pinger_;
   int pings_received_{};

public:
   ponger(context_t ctx, so_5::mbox_t pinger)
      :  so_5::agent_t{std::move(ctx)}
      ,  pinger_{std::move(pinger)}
   {}

   void so_define_agent() override {
      so_subscribe_self().event(
         [this](mhood_t<ping> cmd) {
            ++pings_received_;
            so_5::send<pong>(pinger_, cmd->counter_);
         });
   }

   void so_evt_finish() override {
      std::cout << "pings received: " << pings_received_ << std::endl;
   }
};

int main() {
   so_5::launch([](so_5::environment_t & env) {
         env.introduce_coop([](so_5::coop_t & coop) {
               auto pinger_actor = coop.make_agent<pinger>();
               auto ponger_actor = coop.make_agent<ponger>(
                     pinger_actor->so_direct_mbox());

               pinger_actor->set_ponger(ponger_actor->so_direct_mbox());
            });
      });

   return 0;
}

All agents in the code above are working on the same work thread. How to bind them to different work threads?

It is very simple. Just use an appropriate dispatcher:

int main() {
   so_5::launch([](so_5::environment_t & env) {
         env.introduce_coop(
            so_5::disp::active_obj::make_dispatcher(env).binder(),
            [](so_5::coop_t & coop) {
               auto pinger_actor = coop.make_agent<pinger>();
               auto ponger_actor = coop.make_agent<ponger>(
                     pinger_actor->so_direct_mbox());

               pinger_actor->set_ponger(ponger_actor->so_direct_mbox());
            });
      });

   return 0;
}

Pub/Sub example

SObjectizer supports Pub/Sub model via multi-producer/multi-consumer message boxes. A message sent to that message box will be received by all subscribers of that message type:

#include <so_5/all.hpp>

using namespace std::literals;

struct acquired_value {
   std::chrono::steady_clock::time_point acquired_at_;
   int value_;
};

class producer final : public so_5::agent_t {
   const so_5::mbox_t board_;
   so_5::timer_id_t timer_;
   int counter_{};

   struct acquisition_time final : public so_5::signal_t {};

   void on_timer(mhood_t<acquisition_time>) {
      // Publish the next value for all consumers.
      so_5::send<acquired_value>(
            board_, std::chrono::steady_clock::now(), ++counter_);
   }

public:
   producer(context_t ctx, so_5::mbox_t board)
      :  so_5::agent_t{std::move(ctx)}
      ,  board_{std::move(board)}
   {}

   void so_define_agent() override {
      so_subscribe_self().event(&producer::on_timer);
   }

   void so_evt_start() override {
      // Agent will periodically recive acquisition_time signal
      // without initial delay and with period of 750ms.
      timer_ = so_5::send_periodic<acquisition_time>(*this, 0ms, 750ms);
   }
};

class consumer final : public so_5::agent_t {
   const so_5::mbox_t board_;
   const std::string name_;

   void on_value(mhood_t<acquired_value> cmd) {
      std::cout << name_ << ": " << cmd->value_ << std::endl;
   }

public:
   consumer(context_t ctx, so_5::mbox_t board, std::string name)
      :  so_5::agent_t{std::move(ctx)}
      ,  board_{std::move(board)}
      ,  name_{std::move(name)}
   {}

   void so_define_agent() override {
      so_subscribe(board_).event(&consumer::on_value);
   }
};

int main() {
   so_5::launch([](so_5::environment_t & env) {
         auto board = env.create_mbox();
         env.introduce_coop([board](so_5::coop_t & coop) {
               coop.make_agent<producer>(board);
               coop.make_agent<consumer>(board, "first"s);
               coop.make_agent<consumer>(board, "second"s);
            });

         std::this_thread::sleep_for(std::chrono::seconds(4));
         env.stop();
      });

   return 0;
}

BlinkingLed example

All agents in SObjectizer are finite-state machines. Almost all functionality of hierarchical finite-states machines (HSM) are supported: child states and handlers inheritance, on_enter/on_exit handlers, state timeouts, deep- and shallow state history, except orthogonal states.

Let's see how an agent that implements the following statechart can look like:

Blinking Led Statechart

This is a very simple example that demonstrates an agent for the statechart shown above:

#include <so_5/all.hpp>

using namespace std::literals;

class blinking_led final : public so_5::agent_t {
   state_t off{ this }, blinking{ this },
      blink_on{ initial_substate_of{ blinking } },
      blink_off{ substate_of{ blinking } };

public :
   struct turn_on_off : public so_5::signal_t {};

   blinking_led(context_t ctx) : so_5::agent_t{std::move(ctx)} {
      this >>= off;

      off.just_switch_to<turn_on_off>(blinking);

      blinking.just_switch_to<turn_on_off>(off);

      blink_on
         .on_enter([]{ std::cout << "ON" << std::endl; })
         .on_exit([]{ std::cout << "off" << std::endl; })
         .time_limit(1250ms, blink_off);

      blink_off
         .time_limit(750ms, blink_on);
   }
};

int main()
{
   so_5::launch([](so_5::environment_t & env) {
      so_5::mbox_t m;
      env.introduce_coop([&](so_5::coop_t & coop) {
            auto led = coop.make_agent< blinking_led >();
            m = led->so_direct_mbox();
         });

      const auto pause = [](auto duration) {
         std::this_thread::sleep_for(duration);
      };

      std::cout << "Turn blinking on for 10s" << std::endl;
      so_5::send<blinking_led::turn_on_off>(m);
      pause(10s);

      std::cout << "Turn blinking off for 5s" << std::endl;
      so_5::send<blinking_led::turn_on_off>(m);
      pause(5s);

      std::cout << "Turn blinking on for 5s" << std::endl;
      so_5::send<blinking_led::turn_on_off>(m);
      pause(5s);

      std::cout << "Stopping..." << std::endl;
      env.stop();
   } );

   return 0;
}

CSP-like Ping-Pong example

SObjectizer allows to write concurrent applications even without agents inside. Only plain threads and CSP-like channels can be used.

This is plain-thread implementation of Ping-Pong example (please note that main() is not exception-safe):

#include <so_5/all.hpp>

struct ping {
   int counter_;
};

struct pong {
   int counter_;
};

void pinger_proc(so_5::mchain_t self_ch, so_5::mchain_t ping_ch) {
   so_5::send<ping>(ping_ch, 1000);

   // Read all message until channel will be closed.
   so_5::receive( so_5::from(self_ch).handle_all(),
      [&](so_5::mhood_t<pong> cmd) {
         if(cmd->counter_ > 0)
            so_5::send<ping>(ping_ch, cmd->counter_ - 1);
         else {
            // Channels have to be closed to break `receive` calls.
            so_5::close_drop_content(so_5::exceptions_enabled, self_ch);
            so_5::close_drop_content(so_5::exceptions_enabled, ping_ch);
         }
      });
}

void ponger_proc(so_5::mchain_t self_ch, so_5::mchain_t pong_ch) {
   int pings_received{};

   // Read all message until channel will be closed.
   so_5::receive( so_5::from(self_ch).handle_all(),
      [&](so_5::mhood_t<ping> cmd) {
         ++pings_received;
         so_5::send<pong>(pong_ch, cmd->counter_);
      });

   std::cout << "pings received: " << pings_received << std::endl;
}

int main() {
   so_5::wrapped_env_t sobj;

   auto pinger_ch = so_5::create_mchain(sobj);
   auto ponger_ch = so_5::create_mchain(sobj);

   std::thread pinger{pinger_proc, pinger_ch, ponger_ch};
   std::thread ponger{ponger_proc, ponger_ch, pinger_ch};

   ponger.join();
   pinger.join();

   return 0;
}

Another CSP-example with Golang's like select() statement

SObjectizer provides a select() function that is similar to Golang's select statement. This function allows waiting for incoming messages from several message chains. It also allows to wait for the readiness of message chains for accepting a new outgoing message. So select() allows to do non-blocking send() calls with the handling of incoming messages while the target message chain is full.

There is a Fibonacci calculation example that uses select() as the back-pressure mechanism (number producer thread will wait if number reader thread doesn't read the previous number yet). Note also that main() function in this example is exception-safe.

#include <so_5/all.hpp>

#include <chrono>

using namespace std;
using namespace std::chrono_literals;
using namespace so_5;

struct quit {};

void fibonacci( mchain_t values_ch, mchain_t quit_ch )
{
   int x = 0, y = 1;
   mchain_select_result_t r;
   do
   {
      r = select(
         from_all().handle_n(1),
         // Sends a new message of type 'int' with value 'x' inside
         // when values_ch is ready for a new outgoing message.
         send_case( values_ch, message_holder_t<int>::make(x),
               [&x, &y] { // This block of code will be called after the send().
                  auto old_x = x;
                  x = y; y = old_x + y;
               } ),
         // Receive a 'quit' message from quit_ch if it is here.
         receive_case( quit_ch, [](quit){} ) );
   }
   // Continue the loop while we send something and receive nothing.
   while( r.was_sent() && !r.was_handled() );
}

int main()
{
   wrapped_env_t sobj;

   thread fibonacci_thr;
   auto thr_joiner = auto_join( fibonacci_thr );

   // The chain for Fibonacci number will have limited capacity.
   auto values_ch = create_mchain( sobj, 1s, 1,
         mchain_props::memory_usage_t::preallocated,
         mchain_props::overflow_reaction_t::abort_app );

   auto quit_ch = create_mchain( sobj );
   auto ch_closer = auto_close_drop_content( values_ch, quit_ch );

   fibonacci_thr = thread{ fibonacci, values_ch, quit_ch };

   // Read the first 10 numbers from values_ch.
   receive( from( values_ch ).handle_n( 10 ),
         // And show every number to the standard output.
         []( int v ) { cout << v << endl; } );

   send< quit >( quit_ch );
}

Want to know more?

More information about SObjectizer can be found in the corresponding section of the project's Wiki.

There are more useful stuff in a companion project so5extra

There is a separate companion project so5extra that contains a lot of various useful things like Asio's based dispatchers, additional types of mboxes, revocable timer, synchronous requests, and more.

For example, there is how synchronous interaction looks like (by using so_5::extra::sync stuff):

#include <so_5_extra/sync/pub.hpp>

#include <so_5/all.hpp>

// Short alias for convenience.
namespace sync_ns = so_5::extra::sync;

using namespace std::chrono_literals;

// The type of service provider.
class service_provider_t final : public so_5::agent_t
{
public :
   using so_5::agent_t::agent_t;

   void so_define_agent() override
   {
      so_subscribe_self().event(
            []( sync_ns::request_mhood_t<int, std::string> cmd ) {
               // Transform the incoming value, convert the result
               // to string and send the resulting string back.
               cmd->make_reply( std::to_string(cmd->request() * 2) );
            } );
   }
};

// The type of service consumer.
class consumer_t final : public so_5::agent_t
{
   // Message box of the service provider.
   const so_5::mbox_t m_service;

public :
   consumer_t( context_t ctx, so_5::mbox_t service )
      :  so_5::agent_t{ std::move(ctx) }
      ,  m_service{ std::move(service) }
   {}

   void so_evt_start() override
   {
      // Issue a request and wait for the result no more than 500ms.
      auto result = sync_ns::request_reply<int, std::string>(
            // The destination for the request.
            m_service,
            // Max waiting time.
            500ms,
            // Request's value.
            4 );

      std::cout << "The result: " << result << std::endl;

      so_deregister_agent_coop_normally();
   }
};

int main()
{
   so_5::launch( [](so_5::environment_t & env) {
      env.introduce_coop(
         // Every agent should work on its own thread.
         so_5::disp::active_obj::make_dispatcher( env ).binder(),
         [](so_5::coop_t & coop) {
            auto service_mbox = coop.make_agent< service_provider_t >()
                  ->so_direct_mbox();
            coop.make_agent< consumer_t >( service_mbox );
         } );
   } );
}

SObjectizer itself is intended to be a relatively small project without external dependencies. so5extra has no this constraint. That is why Asio's based dispatchers and environment infrastructures are implemented in so5extra, not in the SObjectizer.

Another significant property of SObjectizer is stability. We're trying to keep SObjectizer as stable as possible, but there is a need to try some new features, even if we don't know yet how successful and demanded they will be. so5extra is a good place to experiment with new features, some of them could be moved to the SObjectizer with time.

So if you don't find a helpful feature in the SObjectizer, let's try to look at so5extra. Maybe it is already there.

Limitations

SObjectizer is an in-process message dispatching framework. It doesn't support distributed applications just out of box. But external tools and libraries can be used in that case. Please take a look at our mosquitto_transport experiment: https://github.com/Stiffstream/mosquitto_transport

Obtaining and building

SObjectizer can be checked out from GitHub. Archives with SObjectizer's source code can be downloaded from GitHub or from SourceForge.

There are two ways for building SObjectizer. The first one by using Mxx_ru tool. The second one by using CMake.

NOTE. Since v.5.5.15.2 there is a support of Android platform. Building for Android is possible by CMake only. See the corresponding section below.

SObjectizer can also be installed and used via vcpkg and Conan dependency managers. See the appropriate sections below.

SObjectizer-5.8 requires C++17!

The 5.8-branch of SObjectizer requires C++17.

If you need support for C++14 or C++11 try to look to older versions of SObjectizer on SourceForge. Or contact stiffstream to discuss porting of SObjectizer-5.8 to older C++ standards.

Building via Mxx_ru

NOTE. This is a standard way for building SObjectizer. This way is used in SObjectizer development process.

To build SObjectizer it is necessary to use Ruby language and Mxx_ru tool. Install Ruby and then install Mxx_ru via RubyGems command:

gem install Mxx_ru

If you already have Mxx_ru installed please update to at least version 1.6.14.6:

gem update Mxx_ru

SObjectizer can be obtained from Git repository on GitHub:

git clone https://github.com/stiffstream/sobjectizer

To build SObjectizer:

cd sobjectizer/dev
ruby build.rb

Static and shared library for SObjectizer will be built. Libraries will be placed into target/release subdirectory.

If you want to build just shared library:

cd sobjectizer/dev
ruby so_5/prj.rb

Or if you want to build just static library:

cd sobjectizer/dev
ruby so_5/prj_s.rb

To build SObjectizer with all tests and samples:

cd sobjectizer/dev
ruby build_all.rb

Please note that under FreeBSD it could be necessary to define LD_LIBRARY_PATH environment variable. And the actual build command sequence under FreeBSD could be as follows:

cd sobjectizer/dev
export LD_LIBRARY_PATH=target/release
ruby build_all.rb

To build html-format documentation for SObjectizer the Doxygen tool is necessary. If it is installed then:

cd sobjectizer/doxygen
doxygen

Generated html-files will be located in sobjectizer/dev/doc/html.

NOTE. If you do not specify MXX_RU_CPP_TOOLSET by youself then Mxx_ru will try to detect your C++ toolset automatically. If you want to use C++ compiler which is not default in your system please define MXX_RU_CPP_TOOLSET environment variable manually. It could look like:

export MXX_RU_CPP_TOOLSET="clang_linux compiler_name=clang++-6 linker_name=clang++-6"

More information about tuning Mxx_ru for your needs you can find in the corresponding documentation.

Building via CMake

To build SObjectizer via CMake it is necessary to have CMake and some knowledge of how to use it. The following action is just a demonstration. For more detailed info about cmake build system for SObjectizer see dev/cmake/CmakeQuickHowto.txt

To get and build SObjectizer under Linux/FreeBSD in command line run:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release ../dev
cmake --build . --config Release
cmake --build . --config Release --target install

Those commands will create all necessary Makefile, then build SObjectizer. If it necessary to build examples and tests too, use

cmake -DBUILD_ALL=ON -DCMAKE_INSTALL_PREFIX=target ../dev

When 'make install' finished './target' will contain two subfolders './bin' with samples and './lib' with shared libso.5.x.x.so

CMake build system currently supports this options:

  • SOBJECTIZER_BUILD_STATIC. Enable building SObjectizer as a static library [default: ON]
  • SOBJECTIZER_BUILD_SHARED. Enable building SObjectizer as a shared library [default: ON]
  • BUILD_ALL. Enable building examples and tests [default: OFF]
  • BUILD_EXAMPLES. Enable building examples [default: OFF]
  • BUILD_TESTS. Enable building tests [default: OFF]

Please note that if BUILD_ALL or BUILD_EXAMPLES or BUILD_TESTS is turned ON then both SOBJECTIZER_BUILD_STATIC and SOBJECTIZER_BUILD_SHARED must be turned ON. It means that if SOBJECTIZER_BUILD_STATIC or SOBJECTIZER_BUILD_SHARED is turned OFF then BUILD_ALL/BUILD_EXAMPLES/BUILD_TESTS all must be turned OFF.

To build SObjectizer under Windows by MS Visual Studio 2013 from command line:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release -G "Visual Studio 15 2017" ../dev
cmake --build . --config Release
cmake --build . --config Release --target install

If it necessary to build examples too, use BUILD_ALL in cmake invocation:

cmake -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release -DBUILD_ALL=ON -G "Visual Studio 15 2017" ../dev

Since v.5.5.24 SObjectizer provides sobjectizer-config.cmake files. These files are automatically installed into <target>/lib/cmake/sobjectizer subfolder. It allows to use SObjectizer via CMake's find_package command.

Building for Android

Building for Android is possible via a rather fresh Android NDK or CrystaX NDK.

Building with Android NDK

You need Android SDK and Android NDK installed in your system. As well as an appropriate version of CMake. You have also need properly set environment variables ANDROID_HOME, ANDROID_NDK. Then you can issue the following commands:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
cmake -DBUILD_ALL -DCMAKE_INSTALL_PREFIX=target -DCMAKE_BUILD_TYPE=Release \
     -DCMAKE_TOOLCHAIN_FILE=${ANDROID_NDK}/build/cmake/android.toolchain.cmake \
     -G Ninja \
     -DANDROID_ABI=arm64-v8a \
     -DANDROID_NDK=${ANDROID_NDK} \
     -DANDROID_NATIVE_API_LEVEL=23 \
     -DANDROID_TOOLCHAIN=clang \
     ../dev
cmake --build . --config=Release
cmake --build . --config=Release --target install

Building with CrystaX NDK

You need CrystaX NDK v.10.4.0 or higher already installed in your system. CMake is used for building SObjectizer:

git clone https://github.com/stiffstream/sobjectizer
cd sobjectizer
mkdir cmake_build
cd cmake_build
export NDK=/path/to/the/crystax-ndk
cmake -DBUILD_ALL -DCMAKE_INSTALL_PREFIX=result -DCMAKE_TOOLCHAIN_FILE=$NDK/cmake/toolchain.cmake -DANDROID_ABI=arm64-v8a ../dev
make
make test
make install

Using C++ Dependency Managers

Using via vcpkg

To use SObjectizer via vcpkg it is necessary to do the following steps.

Install sobjectizer package:

vcpkg install sobjectizer

Add the following lines into your CMakeLists.txt file:

find_package(sobjectizer CONFIG REQUIRED)
target_link_libraries(your_target sobjectizer::SharedLib) # or sobjectizer::StaticLib

Using via Conan

NOTE. Since Feb 2021 new versions of SObjectizer are available via conan-center only.

Installing SObjectizer And Adding It To conanfile.txt

To use SObjectizer via Conan it is necessary to add SObjectizer to conanfile.txt of your project:

[requires]
sobjectizer/5.8.0

It also may be necessary to specify shared option for SObjectizer. For example, for build SObjectizer as a static library:

[options]
sobjectizer:shared=False

Install dependencies for your project:

conan install SOME_PATH --build=missing

Adding SObjectizer To Your CMakeLists.txt

...
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup()

...
target_link_libraries(your_target ${CONAN_LIBS})

License

SObjectizer is distributed under 3-clause BSD license. For license information please see LICENSE file.

sobjectizer's People

Contributors

bsivko avatar dmsck avatar eao197 avatar ngrodzitski avatar zamazan4ik avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sobjectizer's Issues

Can't test an agent that creates a child coop

Hello, I am using sobjectizer in my work project and trying to write some tests for a simple agent using testing_env_t testing environment. I already tested another agent without issues but when I try to test an agent that creates child cooperations I got an error.

Some environment info:
sobjectizer 5.7.2 installed from conan
gcc 11

Simple test agent:

class ChildAgent : public so_5::agent_t {
public:
  explicit ChildAgent(context_t ctx) : so_5::agent_t(std::move(ctx)) {}
};

class TestAgent : public so_5::agent_t {
public:
  explicit TestAgent(context_t ctx) : so_5::agent_t(std::move(ctx)) {}

  void so_evt_start() override {
    so_5::introduce_child_coop(*this, [](so_5::coop_t &coop) {
      coop.make_agent<ChildAgent>();
    });
  }
};

Test code looks like this:

TEST_CASE("Simple agent test") {
  so_5::experimental::testing::testing_env_t testing;
  testing.environment().introduce_coop([](so_5::coop_t &coop) {
    coop.make_agent<TestAgent>();
  });

  // After executing the scenario an exception is thrown
  testing.scenario().run_for(std::chrono::milliseconds(100));
}

When I run this test I get an exception from sobjectizer:

SObjectizer event exception caught: (.../sobjectizer/dev/so_5/impl/coop_repository_basis.cpp:108): error(28) a new coop can't be registered when shutdown is in progress; cooperation: {coop:id=2}

When I remove so_5::introduce_child_coop from TestAgent::so_evt_start then all is ok.

Am I doing something wrong? How could I test an agent with a child cooperation created inside it?

[idea] Registration of user-provided mboxes as named ones

The current version of environment_t::create_mbox(name) always returns a mbox created by SObjectizer. It seems that we can provide a possibility to user to register own mbox as a named one.

Something like:

so_5::mbox_t custom_mbox = so_5::extra::mboxes::unique_subscribers::make(env);
env.register_named_mbox("my_mbox", custom_mbox);
...
auto mbox = env.create_mbox("my_mbox"); // the custom_mbox will be returned.

The changes to SObjectizer can look like:

namespace so_5
{

namespace named_mbox
{

enum class not_unique_name_reaction_t
{
  throw_exception,
  dont_throw_exception
};

constexpr auto throw_if_not_unique = not_unique_name_reaction_t::throw_exception;
constexpr auto nothrow_if_not_unique = not_unique_name_reaction_t::dont_throw_exception;

} /* namespace named_mbox */

class environment_t
{
public:
    ...
    // Returns false if name isn't unique and reaction is dont_throw_if_not_unique.
    bool
    register_named_mbox(nonempty_name_t mbox_name,
      named_mbox::not_unique_name_reaction_t reaction = named_mbox::throw_if_not_unique);
    ...
};

} /* namespace so_5 */

state_t's on_enter/on_exit should accept noexcept methods

For example:

class my_agent : public so_5::agent_t {
  state_t st_normal{ this };
  ...
  void on_enter_st_normal() noexcept {...}
  ...
  void so_define_agent() override {
    st_normal.on_enter(&my_agent::on_enter_st_normal); // (1)
    ...
  }
};

Now the compilation fails at point (1) because in C++17 noexcept is a part of the method signature and that case is not handled in SObjectizer's is_agent_method_pointer metafunction.

Mbox with contention?

Hi,
I hope it's the right place for asking questions.
Please, forgive me if this question is silly, I am learning the basics of SObjectizer.

I have understood that a Mbox enables to dispatch messages to several consumers. As explained here, every consumer receives a copy of each message.

Keeping N consumers, I am wondering if it's possible to have, instead, that every message is received and processed exactly by one consumer, the "first that is free" (clearly, this is a simplification). This way we have contention on the Mbox. It's the classical pattern "N jobs, M workers".

How can I properly model this pattern with SObjectizer?

Many thanks,

Marco

Either passing channel or channel name

Hi,
quick design & style-related question: I have a bunch of agents that publish some signals to some channels (every agent to its own channel):

agent1 -> channel1
agent2 -> channel2
agent3 -> channel3
...

Such agents are dynamic during the lifetime of the application. Signals are sent sporadically.

Now, I need a global agent - let's call it Monitor - to subscribe to all of them, even if they are not known in advance. My idea is to simply have a known channel the Monitor subscribes to - let's call it NotificationChannel. Every time a new agent comes out, it sends a notification to NotificationChannel, this way Monitor can subscribe to the specific channel.

Monitor subscribes to NotificationChannel
...
agent1 sends a notification containing channel1 to NotificationChannel
Monitor subscribes to channel1
...

Two questions:

  • (design-related) what do you think of this approach?
  • (style-related) what would you pass to that NotificationChannel, a reference to the channel itself (e.g. mbox_t) or just the channel name?

Many thanks!

Compilation error with Clang 10

Hi!
SObjectizer 5.7.0 cannot be compiled with Clang 10 (seems like the reason is in the updated libstdc++). I've got the following errors:

In file included from /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.cpp:13:
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:427:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:458:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:491:37: error: no member named 'string_view' in namespace 'std'
                return make_dispatcher( env, std::string_view{}, thread_count );
                                             ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/thread_pool/pub.hpp:524:10: error: no member named 'string_view' in namespace 'std'
                                std::string_view{},
                                ~~~~~^
In file included from /home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.cpp:12:
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:412:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:443:13: error: no type named 'string_view' in namespace 'std'
        const std::string_view data_sources_name_base,
              ~~~~~^
/home/zamazan4ik/.conan/data/sobjectizer/5.7.0/stiffstream/stable/build/ae1b6d0dfb9e84f72c2951a33321a99e6c04a334/sobjectizer/dev/so_5/disp/adv_thread_pool/pub.hpp:476:37: error: no member named 'string_view' in namespace 'std'
                return make_dispatcher( env, std::string_view{}, thread_count );
                                             ~~~~~^
3 errors generated.

As far as I see you just forgot to include #include <string_view> in pub.hpp

Hope it'll help.

The application consumes almost 100% CPU (1 thread) all the time.

Hi,
I am using SObjectizer in our projects. I have some observation on the CPU utilisation. The following is a simple code with one thread handling messages on 2 channels.
// Channel 1 -> process a request and post a message to channel 2
// Channel 2 -> process a request and post a message to channel 1
And the loop continues.

Observation: The application consumes almost 100% CPU (1 thread) all the time.

Query: Please help make it CPU efficient / event based? I admit I am still learning the SObjectizer library.

Sample Code:

#include <thread>
#include <memory>
#include <random>

// SObjectizer is one of a few cross-platform and OpenSource "actor frameworks" for C++. 
// But SObjectizer supports not only Actor Model, but also Publish-Subscribe Model and CSP-like channels. 
// ver 5.7
#include <so_5/all.hpp>

// Message structure
struct poll_device_events_request_t { };

void thread_func(so_5::mchain_t primary_channel, so_5::mchain_t secondary_channel)
{
  // The stuff necessary for random pause generation.
  std::random_device rd;
	std::mt19937 generator{rd()};
	std::uniform_int_distribution<> pause_generator{5000, 10000}; // Delay between 5 - 10 seconds.
  
  // This flag will be set to 'true' when some of channels will be closed.
	bool stop = false;
  auto prepared = so_5::prepare_select(
    so_5::from_all().handle_all()
      // If some channel gets closed we should set out 'stop' flag.
      .on_close([&stop](const auto &) { stop = true; })
      // A predicate for stopping select() function.
      .stop_on([&stop]{ return stop; }),

    // Read and handle poll_device_events_request messages from channel_.
    so_5::receive_case(primary_channel, 
      [&](poll_device_events_request_t){
        
        std::cout << "Received message on primary channel..." << std::endl;
        // Do some work.
        
        // Generate a random pause.
        const std::chrono::milliseconds pause{pause_generator(generator)};
        
        // Send a message to secondary channel - delayed message.
        so_5::send_delayed<poll_device_events_request_t>(secondary_channel, pause);
      }
    ),
    so_5::receive_case(secondary_channel, 
      [&](poll_device_events_request_t){
        std::cout << "Received message on secondary channel..." << std::endl;
        // Do some work.
        
        // Generate a random pause for processing of that request.
        const std::chrono::milliseconds pause{pause_generator(generator)};
        
        // Delay processing of that request by using delayed message.
        so_5::send_delayed<poll_device_events_request_t>(primary_channel, pause);
      }
    )
  );
  
  while (!stop) {
    so_5::select( prepared );
  }
}

// Let's create two channels - 
// Channel 1 -> process a request and post a message to channel 2
// Channel 2 -> process a request and post a message to channel 1
  
int main() {
  std::shared_ptr<so_5::wrapped_env_t> s_node_env_obj_ = std::make_shared<so_5::wrapped_env_t>();
  
  auto primary_channel_ = so_5::create_mchain(*s_node_env_obj_);
  auto primary_channel_closer = so_5::auto_close_drop_content(primary_channel_);
	
  auto secondary_channel_ = so_5::create_mchain(*s_node_env_obj_);
  auto secondary_channel_closer = so_5::auto_close_drop_content(secondary_channel_);
  
  // Thread objects for workers.
	std::thread nthread_ {thread_func, primary_channel_, secondary_channel_};
	auto nthread_joiner = so_5::auto_join( nthread_ );

  // Initiate the first message.
  so_5::send<poll_device_events_request_t>(primary_channel_);

  nthread_.join();
  return 0;
}

Issue in README.md

Hello. In your README.md you have next lines:

Since v.5.5.24 SObjectizer provides sobjectizer-config.cmake files.
These files are automatically installed into <target>/lib/cmake/sobjectizer
subfolder. It allows to use SObjectizer via CMake's find_package command.

You have <target> that doesn't render on GitHub as in Markdown something between <> is a quick link...

Default dispatcher binder of child coops

There is a way to set dispatcher binder for coop. This binder will be used for all agents of that coop by default instead of the default binder from environment infrastructure. Is there a way to force child coops of that coop use this binder by default too?

Best practices to get a timeout?

Hi everyone,
my adventures with SObjectizer continue and I am very happy with it.
I have a question that I suppose is related to agent states, but I need your expert advise.

Suppose I have an agent that subscribes to a mailbox of images (e.g. OpenCV Mat) and shows them in real-time with OpenCV's imshow (it's just for troubleshooting and not intended for production):

//                                                   v-- this is by value for simplicity
so_subscribe(channel).event([](cv::Mat image) {
     imshow("Live Show", image);
     cv::waitKey(1);
});

The problem with this code is that not receiving images for a some time causes the window to hang (the behavior is intended because of cv::waitKey). In another implementation (not based on SObjectizer) I have a way to get into another lambda when the channel does not get data for a specified amount of time.

Imagine something like this:

so_subscribe(channel)
           .event([](cv::Mat image) {
                imshow("Live Show", image);
                cv::waitKey(1);
           })
           .on_timeout(200ms, []{
                cv::destroyAllWindows();
           });

What is the best way to get something like this in SObjectizer?

Many thanks!

Marco

Get dispatcher by name from an environment

Hello.

Is it possible to get a dispatcher from an SObjectizer environment with name (like it can be done already with mbox)? Seems like it's not available for now.

Why it can be useful? The same reason as for mboxes - if I want to create an agent on some specific dispatcher and don't want to pass through many layers dispatcher binder.

What do you think about such feature?

Thank you.

Backwards compatible way of closing mchains

Hi!
I have just seen the new release and I have a small suggestion for the wiki.

Since abstract_message_chain_t::close(close_mode) is now deprecated, I would explicitly mention on the release notes (wiki) how to replace this call (and related ones) with the new one in order to get the very same behavior.

For example:

close_retain_content(chain);

is now equivalent to:

close_retain_content(so_5::exceptions_enabled, chain);

What do you think?

Use so_default_state as a parent state for an another state

Hi!

Is it possible with SObjectizer to use default agent state (which is returned with so_default_state) as a parent state for another state? I am trying to do something like this:

state_t NotStarted{initial_substate_of(so_default_state()), "NotStarted"};

Unfortunately initial_substate_of takes a non-const reference but so_default_state returns const reference, so it doesn't compile.

Is there any workaround (except introducing own "default" state and making it as a parent for another events)? Is it a limitation by design?

Thank you.

[Best practice] How to unsubscribe from all remaining messages on shutdown?

Hi,
suppose I have an agent that can be slow at processing incoming messages. Under some circumstances, I don't want to wait for it to complete when shutdown is sent from the environment, instead I want to terminate it immediately.

The agent is subscribed to a SPMC message box that might contain several messages already. I just want to ignore them all when shutdown is requested.

I had a look at stop guards but I am not sure this is the way to go (it seems to me that they solve exactly the opposite problem).

What's the best practice to get such a behavior?

Let's start from a skeleton of that agent:

class SlowAgent : public so_5::agent_t
{
public:
	SlowAgent(context_t c, so_5::mbox_t input)
		: agent_t(std::move(c)), m_input(std::move(input))
	{
	}

protected:
	void so_define_agent() override
	{
		so_subscribe_self().event([](int i) {
			// just fake some slow processing...
			std::cout << "doing " << i << "\n";
			Sleep(1000);
			std::cout << "done\n";
		});
	}
	
private:
	so_5::mbox_t m_input;
};

Thanks!

A possibility to subscribe a event-handler to a base type of a message

NOTE. This issue is just a reminder that SObjectizer needs a solution for use-cases like described in #24
I don't know how and when this will be implemented (and can it be implemented at all), but the reminder will help to form scope for a new SObjectizer's release when appropriate resources will be available.

The task is to allow writing something like that:

class image_base : public so_5::message_t {...};
class image_vendor_A : public image_base {...};
class image_vendor_B : public image_base {...};
class image_vendor_C : public image_base {...};

// The first handler.
void first_handler::so_define_agent() {
  so_subscribe_self()
    // Handles only one type of images.
    .event([this](mhood_t<image_vendor_A> cmd) {...})
    // All other messages are just logged.
    .event([this](mhood_t<image_base> cmd) { log_image(*cmd); });
}
...
// Another handler.
void second_handler::so_define_agent() {
  so_subscribe_self()
    // Handles two types.
    .event([this](mhood_t<image_vendor_B> cmd) {...})
    .event([this](mhood_t<image_vendor_C> cmd) {...})
    // All other messages are just redirected as is.
    .event([this](mhood_t<image_base> cmd) {
      so_5::send(new_dest, cmd);
    });
}

// Yet another handler.
void third_handler::so_define_agent() {
  // Redirects all messages to another destination.
  so_subscribe_self().event([this](mhood_t<image_base> cmd) {
    so_5::send(new_dest, cmd);
  }
}

Additional requirement. It'll be great to have that functionality for handling messages from mchains too:

so_5::receive(from(mchain).handle_all(),
  [](so_5::mhood_t<image_vendor_A> cmd) {...},
  [](so_5::mhood_t<image_vendor_C> cmd) {...},
  [new_dest](so_5::mhood_t<image_base> cmd) { so_5::send(new_dest, cmd); });

Add custom handler for events for which there is no any handler in current state

Hi!

I am trying to use FSM functionality, which is already integrated into SObjectizer (btw thatnk you a lot for such architecture design - that's very useful in an agent development).

I understand, how can I switch between states, how can I add different event handlers for different states.

In my situation I should add custom reaction (more precisely - sent a message to another agent, which is used for error logging) for events, which is sent to an agent and were not handled since for them were not found any handler (e.g. an event A is not allowed in a state ST and I want to notify about such situation my error-handling agent).

Is there any way for doing such stuff? E.g. in Boost.SML it can be done. See https://boost-experimental.github.io/sml/examples.html#error-handling - work with unexpected_event (btw if you see any features, which are missed in SObjectizer implementation of FSM and there are in Boost.SML, would be fine to add them into SObjectizer as well :) )

I've already checked an example about no_handler_message_tracing: https://github.com/Stiffstream/sobjectizer/blob/master/dev/sample/so_5/nohandler_msg_tracing/main.cpp
But I don't think that it is usable in my situation.

Thank you.

Drop oldest agent message limit?

Hi everybody,
speaking about message limits, I know that limit_then_drop makes the agent ignore new messages. I am wondering if it's possible to drop oldest messages somehow (as it happens with message chains' overflow policy so_5::mchain_props::overflow_reaction_t::remove_oldest).

Many thanks!

Marco

Template error on redirecting signals

Hi everyone,
just to report that redirecting a signal gives a compilation error on Visual Studio 2019 with C++17 enabled (tested on version 16.10.1):

class redirector : public so_5::agent_t {
	...
	void on_some_immutable_signal(mhood_t<some_signal> cmd) {
		so_5::send(another_mbox, cmd);
		...
	}
};

This causes the compiler to complain on line 297 of send_functions.hpp:

error C2059: syntax error: 'template'

I'm using the latest version of SObjectizer (5.7.2.5).

I have not investigated the issue but it seems a bug of the compiler. Is that something happening also on others?

handle_n for mbox?

Hi,
what is your recommended practice to wait for getting exactly N messages from a mailbox? Imagine something similar to handle_n for message chains. I think I have read something about on the wiki but I cannot remember where...

Many thanks.

Marco

A possibility to drop all subscriptions with total deactivation of an agent?

NOTE: it just an idea inspired by #28 . It may not get any implementation. I fix it in issues to return to it when there will be a possibility.

Sometimes it's necessary to deactivate some agent and keep it in that deactivated state until the coop of the agent will be deregistered.

A developer can do that by using an additional state without any subscriptions:

class demo : public so_5::agent_t
{
  state_t st_deactivated{this};
  ...
  void on_some_event() {
    if(has_to_be_deactivated())
      this >>= st_deactivated; // Agent won't react to new messages anymore.
    ...
  }
};

But this approach has a drawback: anyone can still send messages to that agent and some resources will be consumed. It's because all subscriptions of the agent remain valid and messages sent to the deactivated agent will be stored to the agent's event-queue, extracted, checked for the presence of an event-handler in the current state, and only then those messages will be ignored.

It seems that SObjectizer can do better. There is already a special invisible state awaiting_deregistration_state that is used when an agent throws an exception, but the exception reaction is set to deregister_coop_on_exception. So we can provide a new agent_t's method so_deactivate_agent that does the following actions:

  • drops all the subscriptions to avoid resources consumptions if someone tries to send a message to the deactivated agent;
  • switches the agent to awaiting_deregistration_state. In that state, the agent will wait for the deregistration of its coop.

Or we can just add a new method so_drop_all_subscriptions to remove all subscriptions regardless of the states. Including deadletter_handlers.

User payload for event handler subscription

SObjectizer allows to prioritize agents, but often it's not enough. There are two another prioritization levels. First one is level of messages. Dispatcher can define specific class (derived from message_t) to inherit prioritized messages from and resolve message priority by this class. This is great, but may be too expensive sometimes. Another one is level of event handlers. It is powerful enough and cheap, also it is best solution for my case. Unfortunately there is no way to use that level at this moment. I had to patch SObjectizer to pass priority through subscription_bind_t::event and store it in execution_hint_t, but i wish SObjectizer had native way to do it.

Optional argument of subscription_bind_t::event for user payload looks like suitable way that doesn't break backward compatibility. Also it may allow other possibilities for custom dispatchers.

Agent shutdown

Hi,
I have a simple question: is it possible to stop an agent from inside of that agent? In other words, can an agent signal to the environment that it wants to be stopped/terminated/finished in a normal way?

so_deregister_agent_coop_normally() and so_deregister_agent_coop() are close to what I need, however such functions deregister the entire cooperation. My current approach is based on parent-child relations:

  • create a "root" cooperation
  • for each agent, create a child cooperation and register the agent at that
  • when an agent needs to be deregistered, call so_deregister_agent_coop_normally() (this will result in shutting down that agent only)

Is that a good approach?

Just to give you an example of why I need this: I have an agent that communicates with a child process. When the child process sends back a particular data or it just terminates, that agent must be stopped.

Many thanks.

PS I hope to see you in 1-hour at the meetup :)

Classical endless loop?

Hi,
this is probably a simple question but I am wondering which are your recommended answers.

I need to encapsulate a classical endless loop into a SObjectizer agent. Something like this:

class Worker
{
public:
   Worker()
   {
     m_thread = [this]{
         while (!m_stop)
         {
            // ... do something
         }
      };
   }

   void StopAndWait()
   {
      m_stop = true;
      m_thread.join();
   }

private:
   thread m_thread;
   atomic<bool> m_stop = false;
};

Also, I would like to remove "StopAndWait" and let SObjectizer handle shutdown automatically as it happens normally.

I guess this can be modeled with states. Do you recommend any common implementations?

Many thanks.

Handling any message type

Hi,
I have a very simple question: is it possible to make a subscription handler that will be called regardless of the message type?

Something like this:

so_subscribe(board_).event([](any_message_t m){
   // do something
});

// ...
send<int>(board_, 5); // handler called
send<MyType>(board_, MyType{}); // handler called
// ...

What's the recommended practice here?

Thanks.

so_set_delivery_filter and lvalue references

It seems that this code can't be compiled with the current version of SObjectizer:

void so_define_agent() override {
  auto filter = [](const some_message &) { return true; };
  so_set_delivery_filter(mbox, filter);
}

because so_set_delivery_filter expects a rvalue reference to lambda-function.

Maybe it's necessary to fix so_set_delivery_filter to allow acceptance of lvalue references (const and non-const)?

Comparison operators for coop_handle_t

The type coop_handle_t plays a role similar to the smart-pointer but doesn't have comparison operators. It could lead to unexpected result in the following case:

so_5::coop_handle_t h1 = ...;
so_5::coop_handle_t h2 = ...;
if(h1 == h2) { // OOPS!
   ...
}

because the results of conversion from coop_handle_t to bool will be compared in if(h1 == h2).

So it's can be desirable to have comparison operators for coop_handle_t.

Generic message limits for all message types

Hello.

I've read about message limits https://github.com/Stiffstream/sobjectizer/wiki/SO-5.7-InDepth-Message-Limits and a post on Habrahabr about collector-performer pattern.

But I am a little bit wondered, that message limits can be defined only for specific messages. Seems like it's not possible (at least in easy way) define message limits for all message types in a mbox.

Is there any way for doing such thing in SObjectizer? For now I see only one option: inherit all message types from some class generic_message_t and then define limit_and_X<generic_message_t> (I hope it works).

Timers problem when Boost is also used

Hi!

Thank you for very cool library!

I have tried to use your library with Boost, and I am confused due to the build is failed even I only add Boost library without its real using.

I started from building my application without Boost, and the build was successful, then I added boost/1.73.0 in requirements to my conanfile.txt, and builds started failing. The problem was raised on the linking stage:

CMakeFiles/SObjectizer-error.dir/src/main.cpp.o: In function `so_5::details::event_subscription_helpers::ensure_handler_can_be_used_with_mbox(so_5::details::msg_type_and_handler_pair_t const&, so_5::intrusive_ptr_t<so_5::abstract_message_box_t> const&)':
main.cpp:(.text._ZN4so_57details26event_subscription_helpers36ensure_handler_can_be_used_with_mboxERKNS0_27msg_type_and_handler_pair_tERKNS_15intrusive_ptr_tINS_22abstract_message_box_tEEE[_ZN4so_57details26event_subscription_helpers36ensure_handler_can_be_used_with_mboxERKNS0_27msg_type_and_handler_pair_tERKNS_15intrusive_ptr_tINS_22abstract_message_box_tEEE] 0xc0): undefined reference to `so_5::exception_t::raise(char const*, unsigned int, std::string const&, int)'
CMakeFiles/SObjectizer-error.dir/src/main.cpp.o: In function `Agent* so_5::details::event_subscription_helpers::get_actual_agent_pointer<Agent>(so_5::agent_t&)':
main.cpp:(.text._ZN4so_57details26event_subscription_helpers24get_actual_agent_pointerI5AgentEEPT_RNS_7agent_tE[_ZN4so_57details26event_subscription_helpers24get_actual_agent_pointerI5AgentEEPT_RNS_7agent_tE] 0xbd): undefined reference to `so_5::exception_t::raise(char const*, unsigned int, std::string const&, int)'
CMakeFiles/SObjectizer-error.dir/src/main.cpp.o: In function `so_5::message_payload_type_impl<Updater, false>::extract_payload_ptr(so_5::intrusive_ptr_t<so_5::message_t>&)':
main.cpp:(.text._ZN4so_525message_payload_type_implI7UpdaterLb0EE19extract_payload_ptrERNS_15intrusive_ptr_tINS_9message_tEEE[_ZN4so_525message_payload_type_implI7UpdaterLb0EE19extract_payload_ptrERNS_15intrusive_ptr_tINS_9message_tEEE] 0x98): undefined reference to `so_5::exception_t::raise(char const*, unsigned int, std::string const&, int)'

I have also tried to use boost/1.69.0 and boost/1.71.0, but it did not have any effects: the error messages are, and they are similar.

I created simple repository with my code where the error is: https://github.com/FulgurIgor/SObjectizer-error. It has Travis-CI build, where the error is demonstrated:

Source code: https://github.com/FulgurIgor/SObjectizer-error/blob/master/src/main.cpp

Tested systems:

  • Fedora 32, GCC 10.1, SObjectizer/5.7.1, Boost/1.73.0 (and also Boost/1.71.0 and Boost/1.69.0).
  • Ubuntu Bionic, GCC 7.4, SObjectizer/5.7.1, Boost/1.73.0

All builds use Conan and CMake.

Add example for the usage of SObjectizer's layers.

There is no example that shows the usage of SObjectizer's layers in the standard SObjectizer's examples. It will be good to have at least one such example (code snippet from #6 can be used for a very simple one).

Не удается включить work_thread_activity_tracking через params_tuner в wrapped_env_t

Здравствуйте, при попытке включить work_thread_activity_tracking:
so_5::wrapped_env_t env{..., [](auto& p){ p.turn_work_activity_tracking_on();});
auto result = env->environment().work_thread_actovity_tracking();
Ожидается, что result == on, но получается: result == unspecified.
Возможно значение флага теряется внутри конструктора копирования класса environment_params_t(environment.cpp):
... , m_work_thread_activity_tracking(work_thread_activity_tracking:: unspecified) ...
Это не выглядит правильным поведением.
Версия: 5.7.0 но мне кажется, что будет воспроизводиться и в последней.
Заранее спасибо.

Method unsubscribe_event_handlers for mbox has to be noexcept

The method abstract_message_box_t::unsubscribe_event_handlers is not noexcept now (in SO-5.5/5.6/5.7). But this method is often called in noexcept contexts (like destructors or catch blocks). So it seems that unsubscribe_event_handlers has to be noexcept since 5.8.0.

[idea] A possibility to set a custom mbox to be returned via agent_t::so_direct_mbox()?

This is just an idea. It's fixed here to be investigated and, maybe, implemented in some future version of SObjectizer.

Every agent in SObjectizer has its MPSC mbox called direct mbox. This mbox is automatically created by SOEnv for every new agent. That mbox is available via agent_t::so_direct_mbox() method.

The type of that mbox is hidden from a user. A user only knows that it's an MPSC mbox.

Sometimes it can be necessary to hide the agent direct mbox and to expose some proxy instead (for example, a proxy that filters or reorders messages of some types), but it's impossible to do because anyone who has a reference to an agent can call so_direct_mbox, and so_direct_mbox isn't a virtual method.

I think that SObjectizer should have a way to specify a custom MPSC mbox to be used as the agent's direct mbox. But I don't know how to do that at the moment. It requires further thinking and prototyping.

Any way to wait until an agent is ready to receive messages?

Hi,
I have a very simple question. Suppose I am testing an application where I have some agents registered to a global environment.
I would like to be sure that such agents are ready to get messages from channels they have subscribed to. This way I can generate fake messages and make expectations.

At the moment, I just register them and wait - say - 10ms to be on the safe side. However, this is not portable and can fail sporadically on CI builds. In general, I don't like "sleep" of any sort.

Is it possible to wait for such a "state" without adding custom logic?
For example, agents have that ensure_binding_finished. Is this what I am looking for?

For the moment, I can't use the experimental "testing" capabilities in SObjectizer since I have some stable code that can't be easily accommodated to the "scenario" semantics.

Thanks!

Use different message types or different mboxes?

I'm quite impressed by SObjectizer and I want to convert my application to use it. However, at the moment I'm having problems to understand the design I should follow when defining messages and mboxes:

When should I use different message types and when should I use the same types on different mailboxes?

My application controls a number of identical devices (the devices are not known until start-up). Most of the messages to and from the devices are of the same basic type (a triplet of real numbers), although they mean different things. My idea is to have several actors which listen to messages from all devices and do some state estimation and a few which consume both estimations and observations and act a controllers.

In my message broker (MQTT) I have one topic per device (e.g. "device1/measured", "device1/setpoint", "device1/status") because that's the way MQTT is supposed to be used, but I don't believe it is a good idea to have one mbox per device in the application.

I guess my options are:

  1. One generic "board" mbox, with different message subclasses to identify the topic of the message (eg, define "Measured", "Setpoint", "Status", and derive both "Measured" and "Setpoint" from the same class).
  2. One mbox per topic, use the same class for messages whose data is essentially the same (e.g. have a "Measured" and "Setpoint" mboxes, and publish the same type of objects to both).
  3. A combination of (1) and (2): several mboxes and different message classes too.

Regards,

Juan.

[idea] Passing of some message_sink_t to subscribe_event_handler instead of agent_t

NOTE. This idea is for the next big release of SObjectizer (probably breaking compatibility with 5.7-branch). It's unknown at the moment when work on that big release could be started.

In the current version of SObjectizer, only agents can make subscriptions to messages passed to message boxes. It's reflected in the format of abstract_message_box_t::subscribe_event_handler method:

//! Add the message handler.
virtual void
subscribe_event_handler(
	//! Message type.
	const std::type_index & type_index,
	//! Optional message limit for that message type.
	const message_limit::control_block_t * limit,
	//! Agent-subscriber.
	agent_t & subscriber ) = 0;

This prevents some interesting tricks like the chaining of mboxes. For example, there could be a standard MPMC mbox to that some messages are sent. And an instance of so_5::extra::mboxes::round_robin mbox can be subscribed to that MPMC mbox as a subscriber. And one of subscribers to round_robin mbox can be a so_5::extra::mboxes::retained_mbox. And so on.

It seems that there could be an abstract interface named message_sink_t that can be used as a destination for messages. Class agent_t will implement that interface. Message mboxes and message chains can implement that interface too.

It would make possible to subscribe a mbox to another mbox. And that opens a possibility to create chains of mboxes like the one described above.

I think it can make SObjectizer yet more flexible. But this idea is at the very early stage and needs additional thinking and experimenting.

Allow the creation of intrusive_ptr_t from a pointer to agent

Class so_5::agent_t has atomic_refcounted_t as private base class. It means that it isn't possible to create a shared pointer (in the form of so_5::intrusive_ptr_t<T>) for an agent. But sometimes it can be useful for the cases when the lifetime of an agent should be extended for some time after the deregistration of the agent from SObjectizer. For example, if a pointer to the agent is passed to some callback (like callbacks in Asio).

If this feature is really useful it can be implemented in different ways:

  • atomic_refcounted_t can be a public base class for agent_t;
  • some helper function template (like make_agent_ref<T>) + intrusive_ptr_t<T> as friend for agent_t.

How to safely catch and forward a OS signal (SIGINT, SIGHUP, etc)

I need to be able to stop the environment when the application receives a SIGINT/SIGQUIT and send a message to a component to reload the configuration when SIGHUP is received.

Doing this seems to work:

  auto stop_app = [](int signal) -> void {
      main_env->stop();
  };
...
std::signal(SIGTERM, stop_app);

However, reading the code in environment_t::stop() I suspect it is not safe to call from the context of an async signal. The same goes for sending a message to a mbox/mchain, AFAIU there is the possibility of a deadlock if the signal just happens to be delivered while the message box mutex is being held.

How can OS signals be handled safely. I can only think of a few options:

  • Have a dedicated thread to handle signals.
  • Make a custom implementation of mchain that uses a self-pipe.
  • Periodically poll signals through a signalfd.
  • Wait on a signalfd from another thread and send a message though a mchain.
  • Make a custom timer thread implementation that replaces the condition variable by an eventfd or self-pipe, so that it can simultaneously wait on other FDs or receive data from a signal handler.

[idea] Support of delivery filters for MPSC mboxes?

The current version of SObjectizer prohibits the usage of delivery filters with MPSC mboxes. The root of this ban is somewhere in the past. MPSC mboxes were created as an optimization trick to speed up message exchange in 1-to-1 scenarios. Mutable messages and the role of MPSC mboxes for mutable messages were discovered much later.

There is no big sense in support for delivery filters for standard MPSC mboxes (available via agent_t::so_direct_mbox()). But there can be some forms of custom MPSC mboxes (like unique-subscribers mboxes from upcoming so5extra-1.5), and the support for delivery filters for those custom mboxes could be crucial.

The problem is that the current version of SObjectizer doesn't provide a way to specify a delivery filter for a mutable message. Message mutability isn't supported by agent_t::so_set_delivery_filter methods.

Any benchmark program available?

Hi everyone,
this might be a weird question, please let me know if you need more context/information.

I am wondering if any of you has crafted a sort of SObjectizer benchmark program to get some "generic" insights on the performance and throughput of SObjectizer on different machines and targets.

Alternatively, do you have any general tips to write such a program myself? Which kind of things would you measure? Can I use any performance tests you have released already?

Note: I already have performance tests covering specific applications, and I also produce profile reports. Instead, I need a quick way to test and get expectations on SObjectizer's capabilities on many machines (possibly built with proprietary hardware) subjected to different kind of load.

Many thanks!

Marco

Synchronous shutdown

I experiment with switching to sobjectizer in an existing project. By now, the objects in this project get created and destroyed dynamically and they have internal threading (event loops) that get shutdown and waited for in the destructor.

I removed those internal threads and replaced them by companion actors that do the event handling. That works, but I want to make sure that an actor has actually shut down, before finishing the destructor. From what I see, this is currently not happening in a thread safe manor...

Example code

class MyClassCompanion : public so_5::agent_t {
    MyClass& _parent;
    // to things with parent on events
};


class MyClass {
    MyClassCompanion* _actor;
    MyClass() {
                so_5::introduce_child_coop(env.get_root_coop(), [&](so_5::coop_t& coop) {
                    _actor = coop.make_agent<MyClassCompanion>(*this);
                });
    }

    ~MyClass() {
          _actor->so_deregister_agent_coop_normally();
    }
};

When I run valgrind on my solution I see, that sometimes the actor is still accessing its _parent, although the destructor had finished. That leads me to the conclusion that so_deregister_agent_coop_normally is not synchronous, that after it returns something is still running...

  1. is so_deregister_agent_coop_normally thread-safe and synchronous?
  2. is the way of spawning agents in the constructor correct? is that thread-safe?

noexcept for closing mchains

At the moment abstract_message_chain_t::close() method and close_drop_content()/close_retain_content() methods is not marked as noexcept.

It seems that mchain closing shouldn't be a throwing operation, because it's often called from destructors.

This moment should be explored with additional care and those methods/functions have to be marked as noexcept if is possible from the implementation's point of view.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.