Giter Site home page Giter Site logo

victimsnino / reactiveplusplus Goto Github PK

View Code? Open in Web Editor NEW
210.0 9.0 20.0 103.48 MB

Implementation of async observable/observer (Reactive Programming) in C++ with care about performance and templates in mind in ReactiveX approach

Home Page: https://victimsnino.github.io/ReactivePlusPlus/v2/docs/html/md_docs_2readme.html

License: Boost Software License 1.0

CMake 5.04% C++ 94.38% Python 0.58%
cpp cpp20 rxcpp reactive-programming reactivex modern-cpp observable observer-pattern push-model rpp rx

reactiveplusplus's Issues

Throttle operator

Hi there,
great to watch your progress with v2 and I already saw, that you are about to integrate Qt again.
I would have another request for an operator: this time it is "throttle".

Once Qt and throttle is there, I will switch to RppV2 and serve as tester :)

Thanks again and best regards!

Re-implement scheduler's related code

  • (???) Split TrampolineScheduler into two:
    1. TrampolineScheduler which JUST TRAMPOLINE with LOCAL queue to just schedule "schedulable after current schedulable completed"
    2. CurrentThreadScheduler = TrampolineScheduler with thread_local queues
  • Re-implement scheduler's based operators in the following way:
    1. Instead of "just scheduling" each emission to scheduler it needs to have local queue
    2. On new emission push emission to queue
    3. If queue is empty, then schedule draining of queue to scheduler
    4. If queue is not empty, then just do nothing due to "someone else" just scheduled draining
    5. Recursively re-schedule same schedulable to drain queue while queue is not empty.
  • (????) NewThread/RunLoop and etc should store their queues as "CurrentThread" or not?

New issue like replacement for #277

Consider pushing builds to NUGet

NUGet does not require libraries to be of at least 1.0 versioning, and pushing development versions gives you more exposure. This in turn increases your feedback.

libuv / uvw integration

Hi @victimsnino,

i would like to come up with another request :)
As already mentioned before i replaced Qt with libuv (or specifically uvw, libuv's C++ wrapper).
Do you plan any further mainloop integrations (besides Qt)?

That would be great. Thanks a lot!

Re-implement scheduler's related code

UPD:

  1. Split TrampolineScheduler into two:
    1. TrampolineScheduler which JUST TRAMPOLINE with LOCAL queue to just schedule "schedulable after current schedulable completed"
    2. CurrentThreadScheduler = TrampolineScheduler with thread_local queues
  2. Re-implement scheduler's based operators in the following way:
    1. Instead of "just scheduling" each emission to scheduler it needs to have local queue
    2. On new emission push emission to queue
    3. If queue is empty, then schedule draining of queue to scheduler
    4. If queue is not empty, then just do nothing due to "someone else" just scheduled draining
    5. Recursively re-schedule same schedulable to drain queue while queue is not empty.
  3. (????) NewThread/RunLoop and etc should store their queues as "CurrentThread" or not?

Run loop scheduler

Hi, thanks for creating this great project that keep the reactive programming up!

I wonder whether there is an implementation or plan to implement the runloop scheduler as in the rxcpp library? I found it effective to implement the cooperative multiprocessing an embedded platform.

Thanks!

Please update RPP in conan center

As my current project relies heavily on conan, and coincidentally I am using distinct op in v2.1.0 as git submodule, please conisder updading package in conan.

At the time being, latest version on conan is still v2.0.0. And I've requested update request at conan center.

Let me know if anything I can do to speed up the process.

Game example(s)

Given the asynchronous nature of Rx and the performant aspect of this library makes it a possible candidate for rapid prototyping of games which are just transforming a stream of user interactions to a stream of graphics.

There is an example of Reactive programming used in a very elegant way, just composing together all the parts in a single statement, but it is in Scala.

There was also interest in this style of programming, at Microsoft in the 90s, for animation and multimedia of which the aforementioned Scala library is an example (still need to read the paper, which has examples in Haskel).

If Rx is as suitable as I think for games, it would be an awesome tool for game jams once a library of reusable components was built up. Starting with SDL2, I think, would make a good start and I'd be keen to help out there where I can.

(I confess I only recently started trying to write simple 2D games in order learn C++, albeit I have 20+ years as a software engineer and I've been itching to find an application of Rx I could get into)

How to refer to observable type anyway?

It's a common practice to return an observable<T> to for a method to represent values of T in async fashion.

class Person {
   observable<std::string> Talk();
} 

While rpp does provide an observable class, but it has too many class template arguments which I can find very few explainations in docs. Before reading your whole project code, would you please enlighten me about a simple way to define actual class type other than auto.

Provide in-place manipulation of stateful subjects (behaviour subject)

Hey there,
I have another feature request for you :)
I was wondering, if one could manipulate the state of a behaviour subject in-place?
Imagine, if subject carries complex object, e.g.
publish_subject<std::map<std::string, std::pair<double, double>>>.
Instead of changing the whole map instance, it would be more efficient to change the map in-place and emit manually.
What is your opinion on that?

Reemit from subscription

Hi there,
another question regarding some detail about your lib (still on v1, though).
Apparently, there is some issue when emitting something from within a subscription.

The following won't work:

publish_subject<int> subject1;
publish_subject<int> subject2;
subject1.subscribe([](int value) {
  subject2.get_subscriber().on_next(value+1); // This will block
});

However, this will do the trick:

publish_subject<int> subject1;
publish_subject<int> subject2;
subject1.subscribe(subject2.get_subscriber());

Due to a complex operation, I would prefer something with the first syntax. (Instead of using the map operator or similar).
Can you give me a hint here, please?

Thanks already! :)

Two broken links

Hi,

first, many thanks for this amazing library, which makes reactive finally usable in c++.

Please fix two broken links:

Thx.

segfault in snake

Built examples with

cmake -B build -DRPP_BUILD_SAMPLES=1 -DCMAKE_CXX_COMPILER=clang++-14 -DRPP_BUILD_SFML_CODE=1 -DRPP_BUILD_TESTS=1 -DCMAKE_BUILD_TYPE=Debug

cmake --build build

tests all passed.

but running build/bin/snake pops up the sfml window and then immediately segfaults.

ubuntu 20.04 with clang14 and SFML 2.5.1 (from system apt repo)

gdb ./build/bin/snake

run

Program received signal SIGSEGV, Segmentation fault.
0x000055555558d719 in move_snake (body=..., direction_and_length=std::tuple containing = {...}) at /home/oliver/c/ReactivePlusPlus/src/samples/sfml/snake/snake.cpp:28
28	    auto head = body.back();

the backtrace is quite overwhelming

Add average operator (and other statistical)

Hi @victimsnino
i am opening another thread, since it might exceed my original request(s) a bit...

How open are you about non-(rx)-standard filters?

The average operator is already part of the original rx definition (which i also have a usage for).
However, other filters come to my mind like exponential moving average or median filtering. Those are realised in influxdb:
https://docs.influxdata.com/flux/v0.x/stdlib/universe/exponentialmovingaverage/
https://docs.influxdata.com/flux/v0.x/stdlib/universe/median/

I don't know, whether there are implementations for other languages, but those filters would be pretty helpful for financial, statistical and signal processing applications (and for my application...).

Looking forward to your opinion on that.

Create another v1 release

Hi there,
great to watch your effort on v2!

However, I am still using v1 since I am using operators and other stuff which is not completed yet for v2.
For v1 I also need a recent version, which I obtain via CMake's fetchcontent and a commit hash. This has same drawbacks, since it takes a lot of time to check out. A better way would be to rely on a tag/release. So, are you planning to create another (or last) release for the v1 branch?
That would be great :)

Thanks already.

Group-by test fails in debug mode

Discussed in #194

All the tests passed, except for test_group_by.exe. It threw an exception for invalid comparator at this line of the test;

obs.group_by(std::identity{}, std::identity{}, [](int, int){return true;}).subscribe([&](const auto& grouped)

which ultimately called this line and the emplace that crashed:

auto [itr, inserted] = state->key_to_subject.try_emplace(key);

The STL source code indicates that this fails because

 // test if _Pred(_Left, _Right) and _Pred is strict weak ordering, when the arguments are the cv-same-type
STL Excerpt
template <class _Pr, class _Ty1, class _Ty2,
    enable_if_t<is_same_v<_Remove_cvref_t<_Ty1>, _Remove_cvref_t<_Ty2>>, int> = 0>
constexpr bool _Debug_lt_pred(_Pr&& _Pred, _Ty1&& _Left, _Ty2&& _Right) noexcept(
    noexcept(_Pred(_Left, _Right)) && noexcept(_Pred(_Right, _Left))) {
    // test if _Pred(_Left, _Right) and _Pred is strict weak ordering, when the arguments are the cv-same-type
    const auto _Result = static_cast<bool>(_Pred(_Left, _Right));
    if (_Result) {
        _STL_VERIFY(!_Pred(_Right, _Left), "invalid comparator");
    }

    return _Result;
}

That's as far as I've gone on drilling down into the problem. I just wanted to give you a heads up.

macOS/clang support

Hey there,
thanks for this nice lib.
I am trying to compile it on macOS with its shipped clang. Unfortunately, this is not working, since some c++20 features are missing within clang and libc++: std::ranges::for_each, std::ranges::copy_if, std::stop_token.
https://en.cppreference.com/w/cpp/compiler_support/20#C.2B.2B14_library_features

My question is, will you consider compatibility with macOS and clang?
If yes, are there any plans on this in the near future?
Or would you accept PRs fixing macOS/clang compilation?

Thanks a lot!

RPP v2 - Implementation Status

Current implementation is really good and fast BUT it does a lot of unnecessary heap allocations AND a lot of unnecessary moves/copies while it is not needed. Need to change architecture to eliminate unnecessary copies/moves

  1. use rvalue while possible
  2. subscription -> disposable. Not observer owns subscription and tracks ITS correctness, but observable provides disposable and tracks observable's correctness
  3. observer is not copyable at all
  4. Make RPP v2 as ZERO OVERHEAD library

Implementation status

Fundamentals

  • Observables
    • Specific Observable (was in v1)
    • Dynamic Observable (was in v1)
    • Blocking Observable (was in v1)
    • Connectable Observable (was in v1)
  • Observers
    • Specific Observer (was in v1)
    • Dynamic Observer (was in v1)
  • Disposables
    • Base Disposable
    • Callback Disposable (was in v1)
    • Composite Disposable (was in v1)
    • RefCount Disposable
  • Schedulers
    • Immediate (was in v1)
    • New Thread (was in v1)
    • CurrentThread (was in v1)
    • RunLoop (was in v1)
    • EventLoop

Creating Observables

  • Create (was in v1)
  • Just (was in v1)
    • + memory_model (was in v1)
  • empty/never/error
  • from
    • iterable (was in v1)
    • future
    • callable (was in v1)
    • async
  • defer
  • interval
  • range
  • repeat
  • timer
  • concat

Operators

Transforming

  • map (was in v1)
  • group_by (was in v1)
  • flat_map (was in v1)
  • scan
    • scan with initial seed (was in v1)
    • scan without seed
  • buffer
    • count (was in v1)
      • skip
    • time
    • time_or_count
  • window
    • count (was in v1)
      • skip
    • time
    • time_or_count
    • toggle

Filtering

  • filter (was in v1)
  • take (was in v1)
  • debounce (was in v1)
  • distinct
    • distinct
    • distinct_until_changed (was in v1)
  • element_at
  • first (was in v1)
  • ignore_elements
  • last (was in v1)
  • sample (was in v1)
    • sample (observable)
    • sample_with_time
  • skip (was in v1)
  • skip_last
  • take_last (was in v1)
  • throttle

Conditional

  • take_while (was in v1)
  • all
  • amb
  • contains
  • default_if_empty
  • sequence_equal
  • skip_until
  • skip_while
  • take_until (was in v1)

Combining

  • merge
    • observable of observables (was in v1)
    • merge with (was in v1)
    • merge delay error
  • switch
    • switch_map (was in v1)
    • switch_on_next (was in v1)
    • switch_if_empty
  • with_latest_from (was in v1)
  • start_with (was in v1)
  • combine_latest (was in v1)
  • zip

Aggregate

  • average
  • concat (was in v1)
  • count
  • max
  • min
  • reduce (was in v1)
  • sum

Backpressure

  • backpressure ???

Error handling

  • on_error_resume (was in v1)
  • retry

Utility

  • observe_on (was in v1)
  • repeat (was in v1)
    • scheduling (by default trampoline ?)
  • subscribe_on (was in v1)
  • delay (was in v1)
  • do/tap (was in v1)
    • tap with observer
    • tap with callbacks
    • do_on_next
    • do_on_error
    • do_on_completed
  • timeout (was in v1)
    • timeout
    • timeout with fallback observable
  • finally

Connectable

  • publish (was in v1)
  • multicast (was in v1)
  • connect (was in v1)
  • ref_count (was in v1)
  • replay

Subjects

  • publish_subject (was in v1)
  • behavior_subject (was in v1)
  • serialized_subject
  • replay_subject
  • async_subject

QT:

  • from_event (was in v1)
  • main_thread_scheduler (was in v1)

Question about sample_with_time (oversampling)

Hi There,
i have a question regarding the sample_with_time operator. When reading the documentation and running the example, it gets clear, that this operator can serve for undersampling:

            source observable              : +--1---2-3-4---5-6-7-|
            operator "sample_with_time(2)" : +--1---2---4---5---7-|

However, it does not seem to work with oversampling, e.g.:

            source observable              : +--1-----3-4---5-6-7-|
            operator "sample_with_time(2)" : +--1---1---4---5---7-|

I know this behaviour from rxJava and i worked around this using a time-based buffer.
I cannot tell, if that behaviour is really intended.

Anyway, i could have a usage for some time based operators, if oversampling will not work here. (e.g. buffer with time).

What do you think?

Resolve "nano sleeps"

Currently we are doing sleep_for inside schedulers. BUT sleeping is pretty expensive operation, so, if we are doing sleep_for(std::chrono::nanoseconds{1}) actually it is about 18-50k nanoseconds sleep. So, actually sleep with duration <= 18-50k ns is unreachable with std.

UPD: Looks like clang/gcc uses nanosleep and it doesn't work
UPD: Looks like best possible solution: change RPP's granularity to microseconds instead

The trampoline scheduler in rpp seems slower than in rxcpp

We've found that trampoline scheduler works a bit slower than rxcpp on ubuntu. We need to find source of this issue.

Also subscribe started to work slower after introduction of trampoline scheduler. It's unexpected to have such a huge performance hit.

timeout operator

The timeout operator can be very useful to implement when deal with the communication protocols. However, not all languages implemented this operator. Is there any plan to implement it in this library or is there some alternative with the existing operators?

Thanks!

Rpp doesn't compile - VS22 - Windows - Latest release

Hi, I really need help. I've been trying to fix it for a long time.

I've been trying to compile ReactivePlusPlus, but for some reason it doesn't work.
Could you please help me to find a solution?

I did as described BUILDING.md and it didn't work.
I'll show below the erros I found and what I did to create my project.

These are the errors messages when I try to build on Visual Studio 2022 version 4.8.04161:

Severity	Code	Description	Project	File	Line	Suppression State
Error	C3539	a template-argument cannot be a type that contains 'auto'	DemoRPP	D:\cpp\DemoRPP\DemoRPP\rpp\observers\specific_observer.hpp	81	
Error	C3539	a template-argument cannot be a type that contains 'auto'	DemoRPP	D:\cpp\DemoRPP\DemoRPP\rpp\observers\specific_observer.hpp	87	
Error	C3539	a template-argument cannot be a type that contains 'auto'	DemoRPP	D:\cpp\DemoRPP\DemoRPP\rpp\observers\specific_observer.hpp	94	
Error	C3539	a template-argument cannot be a type that contains 'auto'	DemoRPP	D:\cpp\DemoRPP\DemoRPP\rpp\observers\specific_observer.hpp	101	
Error	C2672	'rpp::details::member_overload<Type,SpecificObservable,rpp::details::subscribe_tag>::subscribe_impl': no matching overloaded function found	DemoRPP	D:\cpp\DemoRPP\DemoRPP\rpp\operators\fwd\subscribe.hpp	60	
Error	C2672	'make_specific_subscriber': no matching overloaded function found	DemoRPP	D:\cpp\DemoRPP\DemoRPP\rpp\operators\fwd\subscribe.hpp	60	

This is how I created my project:

(1) Empty C++ project on VS22 > Created main.cpp file ( https://pastebin.com/raw/qMBkwkiM )
main.cpp screenshot: https://ibb.co/fpPbKP6

(2) After that, I changed it to C++20 and later to C++ latest
C++20 project screenshot: https://ibb.co/q9FRGGV

(3) I downloaded the latest release and I added it to my project. rpp_release
Added RPP screenshot: https://ibb.co/3mPkdBk

How I added to my project: Properties > C/C++ > Additional Include Directories.

  • I can see that VS recognizes the folders and the lib - I can navigate inside the functions/methods definitions.
  • I tried different paths when I added it to my project.
  • I tried build x64 - Release and Debug mode

What should I do to fix it?

Thanks in advance

EDIT: VS22 preview is the issue here

creating an observable from C callback functions

I cannot figure out the correct way to create an observable from a C style emitter with callback functions. Example shape:

typedef void (*StringCallback)(const char* s);
typedef void (*DoneCallback)();
// we don't know how this function is implemented, only that it calls onstring()
// a number of times and ondone() once, almost certainly from another thread.
void do_async_thing(StringCallback onstring, DoneCallback ondone);

I have a solution of sorts, but it is almost certainly "wrong" as it involves creating a dynamic object and cleaning up. Rather than lead with my incorrect approach, better to just ask flatly how to turn the above into an observable to which I can subscribe. The key point in the setup above is that all the "state" is maintained inside the black box which is do_async_thing(). It takes care of whatever state it needs to emit C strings, I don't. I am sure the solution is "obvious" once I see it, but I've tried at length and appear to be missing something fundamental. TIA.

Polish early_unsubscribe

early_unsubscribe feature works awesome, but we can also extend it via adding checks to callbacks if someone requested unsubscribe or not

QT support

Add support for QT as rppqt:

  1. Create sources to create observable from signal
  2. Create scheduler to schedule emissions to "main" thread
  3. Create some examples

Fails to compile with clang11

README claims that clang11 is supported, but when I try the basic example with clang11 on Linux, compilation fails:

In file included from /home/conan/w/prod/BuildSingleReference/conan-center-index/recipes/reactiveplusplus/all/test_package/test_package.cpp:1:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/rpp.hpp:13:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/observables.hpp:19:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/observables/specific_observable.hpp:14:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/observables/fwd.hpp:13:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/subscribers/fwd.hpp:13:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/observers/constraints.hpp:13:
In file included from /home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/observers/fwd.hpp:13:
/home/conan/w/prod/BuildSingleReference/.conan/data/reactiveplusplus/0.1.2/_/_/package/5ab84d6acfe1f23c4fae0ab88f26e3a396351ac9/include/rpp/utils/constraints.hpp:13:10: fatal error: 'concepts' file not found
#include <concepts>

Could you provide exact minimum supported versions of major compilers please (gcc, clang, msvc, apple-clang, though I guess apple-clang is not supported at all since it fails to compile even with latest version apple-clang 14 due to lack of std::ranges support)?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.