Giter Site home page Giter Site logo

paragroup / windflow Goto Github PK

View Code? Open in Web Editor NEW
71.0 7.0 16.0 48.12 MB

A C++17 Data Stream Processing Parallel Library for Multicores and GPUs

License: GNU Lesser General Public License v3.0

C++ 91.00% CMake 1.91% Shell 0.02% Dockerfile 0.01% Java 2.58% HTML 0.16% JavaScript 4.19% CSS 0.13%
stream streaming stream-processing streams streaming-data streaming-api stream-api parallel-patterns parallel-computing parallel-programming

windflow's Introduction

License: LGPL v3 License: MIT Release Hits Say Thanks! Donate

Introduction

WindFlow is a C++17 header-only library for parallel data stream processing targeting heterogeneous shared-memory architectures equipped with multi-core CPUs and NVIDIA GPUs. The library provides traditional stream processing operators like map, flatmap, filter, reduce as well as window-based operators. The API allows building streaming applications through the MultiPipe and the PipeGraph programming constructs. The first is used to create parallel pipelines (with shuffle connections), while the second allows several MultiPipe instances to be interconnected through merge and split operations, in order to create complex directed acyclic graphs of interconnected operators.

Analogously to existing popular stream processing engines like Apache Storm and FLink, WindFlow supports general-purpose streaming applications by enabling operators to run user-defined code. The WindFlow runtime system has been designed to be suitable for embedded architectures equipped with low-power multi-core CPUs and integrated NVIDIA GPUs (like the Jetson family of NVIDIA boards). However, it works well also on traditional multi-core servers equipped with discrete NVIDIA GPUs.

At the moment WindFlow is for single-node execution. We are working to a distributed implementation.

The web site of the library is available at: https://paragroup.github.io/WindFlow/.

Dependencies

The library requires the following dependencies:

  • a C++ compiler with full support for C++17 (WindFlow tests have been successfully compiled with both GCC and CLANG)
  • FastFlow version >= 3.0 (https://github.com/fastflow/fastflow)
  • CUDA (version >= 11.5 is preferred for using operators targeting GPUs)
  • libtbb-dev required by GPU operators only
  • libgraphviz-dev and rapidjson-dev when compiling with -DWF_TRACING_ENABLED to report statistics and to use the Web Dashboard for monitoring purposes
  • librdkafka-dev for using the integration with Kafka (special Kafka_Source and Kafka_Sink operators)
  • librocksdb-dev for using the suite of persistent operators keeping their internal state in RocksDB KVS
  • doxygen (to generate the documentation)

Important about the FastFlow dependency -> after downloading FastFlow, the user needs to configure the library for the underlying multi-core environment. By default, FastFlow pins its threads onto the cores of the machine. To make FastFlow aware of the ordering of cores, and their correspondence in CPUs and NUMA regions, it is important to run (just one time) the script "mapping_string.sh" in the folder fastflow/ff before compiling your WindFlow programs.

Macros

WindFlow, and its underlying level FastFlow, come with some important macros that can be used during compilation to enable specific behaviors. Some of them are reported below:

  • -DWF_TRACING_ENABLED -> enables tracing (logging) at the WindFlow level (operator replicas), and allows streaming applications to continuously report statistics to a Web Dashboard (which is a separate sub-project). Outputs are also written in log files at the end of the processing
  • -DTRACE_FASTFLOW -> enables tracing (logging) at the FastFlow level (raw threads and FastFlow nodes). Outputs are written in log files at the end of the processing
  • -DFF_BOUNDED_BUFFER -> enables the use of bounded lock-free queues for pointer passing between threads. Otherwise, queues are unbounded (no backpressure mechanism)
  • -DDEFAULT_BUFFER_CAPACITY=VALUE -> set the size of the lock-free queues capacity. The default size of the queues is of 2048 entries
  • -DNO_DEFAULT_MAPPING -> if this macro is enabled, FastFlow threads are not pinned onto CPU cores, but they are scheduled by the Operating System
  • -DBLOCKING_MODE -> if this macro is enabled, FastFlow queues use the blocking concurrency mode (pushing to a full queue or polling from an empty queue might suspend the underlying thread). If not set, waiting conditions are implemented by busy-waiting spin loops.

Some macros are useful to configure the runtime system when GPU operators are utilized in your application. The default version of the GPU support is based on explicit CUDA memory management and overlapped data transfers, which is a version suitable for a wide range of NVIDIA GPU models. However, the developer might want to switch to a different implementation that makes use of the CUDA unified memory support. This can be done by compiling with the macro -DWF_GPU_UNIFIED_MEMORY. Alternatively, the user can configure the runtime system to use pinned memory on NVIDIA System-on-Chip devices (e.g., Jetson Nano and Jetson Xavier), where pinned memory is directly accessed by CPU and GPU without extra copies. This can be done by compiling with the macro -DWF_GPU_PINNED_MEMORY.

Build the Examples

WindFlow is a header-only template library. To build your applications you have to include the main header of the library (windflow.hpp). For using the operators targeting GPUs, you further have to include the windflow_gpu.hpp header file and compile using the nvcc CUDA compiler (or through clang with CUDA support). The source code in this repository includes several examples that can be used to understand the use of the API and the advanced features of the library. The examples can be found in the tests folder. To compile them:

    $ cd <WINDFLOW_ROOT>
    $ mkdir ./build
    $ cd build
    $ cmake ..
    $ make -j<no_cores> # compile all the tests (not the doxygen documentation)
    $ make all_cpu -j<no_cores> # compile only CPU tests
    $ make all_gpu -j<no_cores> # compile only GPU tests
    $ make docs # generate the doxygen documentation (if doxygen has been installed)

In order to use the Kafka integration, consisting of special Source and Sink operators, the developer has to include the additional header kafka/windflow_kafka.hpp and properly link the library librdkafka-dev. Analogously, to use persistent operators, you need to include the header persistent/windflow_rocksdb.hpp and link the library librocksdb-dev.

Docker Images

Two Docker images are available in the WindFlow GitHub repository. The images contain all the synthetic tests compiled and ready to be executed. To build the first image (the one without tests using GPU operators) execute the following commands:

    $ cd <WINDFLOW_ROOT>
    $ cd dockerimages
    $ docker build -t windflow_nogpu -f Dockerfile_nogpu .
    $ docker run windflow_nogpu ./bin/graph_tests/test_graph_1 -r 1 -l 10000 -k 10

The last command executes one of the synthetic experiments (test_graph_1). You can execute any of the compiled tests in the same mannner.

The second image contains all synthetic tests with GPU operators. To use your GPU device with Docker, please follow the guidelines in the following page (https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html). Then, you can build the image and run the container as follows:

    $ cd <WINDFLOW_ROOT>
    $ cd dockerimages
    $ docker build -t windflow_gpu -f Dockerfile_gpu .
    $ docker run --gpus all windflow_gpu ./bin/graph_tests_gpu/test_graph_gpu_1 -r 1 -l 10000 -k 10

Again, the last command executes one of the synthetic experiments (test_graph_gpu_1). You can execute any of the compiled tests in the same mannner.

Web Dashboard

WindFlow has its own Web Dashboard that can be used to profile and monitor the execution of running WindFlow applications. The dashboard code is in the sub-folder WINDFLOW_ROOT/dashboard. It is a Java package based on Spring (for the Web Server) and developed using React for the front-end part. To start the Web Dashboard run the following commands:

    cd <WINDFLOW_ROOT>/dashboard/Server
    mvn spring-boot:run

The web server listens on the default port 8080 of the machine. To change the port, and other configuration parameters, users can modify the configuration file WINDFLOW_ROOT/dashboard/Server/src/main/resources/application.properties for the Spring server (e.g., to change the HTTP port), and the file WINDFLOW_ROOT/dashboard/Server/src/main/java/com/server/CustomServer/Configuration/config.json for the internal server receiving reports of statistics from the WindFlow applications (e.g., to change the port used by applications to report statistics to the dashboard).

WindFlow applications compiled with the macro -DWF_TRACING_ENABLED try to connect to the Web Dashboard and report statistics to it every second. By default, the applications assume that the dashboard is running on the local machine. To change the hostname and the port number, developers can use the macros WF_DASHBOARD_MACHINE=hostname/ip_addr and WF_DASHBOARD_PORT=port_number.

About the License

From version 3.1.0, WindFlow is released with a double license: LGPL-3 and MIT. Programmers should check the licenses of the other libraries used as dependencies.

Cite our Work

In order to cite our work, we kindly ask interested people to use the following references:

@article{WindFlow,
 author={Mencagli, Gabriele and Torquati, Massimo and Cardaci, Andrea and Fais, Alessandra and Rinaldi, Luca and Danelutto, Marco},
 journal={IEEE Transactions on Parallel and Distributed Systems},
 title={WindFlow: High-Speed Continuous Stream Processing With Parallel Building Blocks},
 year={2021},
 volume={32},
 number={11},
 pages={2748-2763},
 doi={10.1109/TPDS.2021.3073970}
}
@article{WindFlow-GPU,
 title = {General-purpose data stream processing on heterogeneous architectures with WindFlow},
 journal = {Journal of Parallel and Distributed Computing},
 volume = {184},
 pages = {104782},
 year = {2024},
 issn = {0743-7315},
 doi = {https://doi.org/10.1016/j.jpdc.2023.104782},
 url = {https://www.sciencedirect.com/science/article/pii/S0743731523001521},
 author = {Gabriele Mencagli and Massimo Torquati and Dalvan Griebler and Alessandra Fais and Marco Danelutto},
}

Requests for Modifications

If you are using WindFlow for your purposes and you are interested in specific modifications of the API (or of the runtime system), please send an email to the maintainer.

Contributors

The main developer and maintainer of WindFlow is Gabriele Mencagli (Department of Computer Science, University of Pisa, Italy).

windflow's People

Contributors

massimotorquati avatar mencagli avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

windflow's Issues

Send EventId along the Pipeline

Hi,

I am using this library to build a calculation pipeline (An Event-Driven pipeline source from Kafka).
And I am trying to use RuntimeContext to share the eventId in downstream operators (so that I can have better visibility on how the calculation steps are).

But I see that RuntimeContext is not shared between operators, and I don't want to include this EventId in my pipeline's messages. so any idea?

Thanks

Allowing nodes to process different types of input.

Dear Gabriele, Massimo,

I have noticed that the static asserts in the WindFlow code forbid a node from processing different types of input at the same time. I can see it in the sink, map, builders, etc code. Here is an example:

class MapFunctor
{
public:
 inline const void * operator()(const TypeOne * const & x) { return nullptr; }
 inline const void * operator()(const TypeTwo * const & x) { return nullptr; }
};

If both of the functions are present in the code, then the compilation fails with errors like these:

map.hpp: error: static assertion failed: WindFlow Compilation Error - Map_Replica does not have a valid functional logic:

   63 |     static_assert(isInPlaceNonRiched || isInPlaceRiched || isNonInPlaceNonRiched || isNonInPlaceRiched,
      |                   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~
builders.hpp: error: static assertion failed: WindFlow Compilation Error - unknown signature passed to the Map_Builder:
  Candidate 1 : void(tuple_t &)
  Candidate 2 : void(tuple_t &, RuntimeContext &)
  Candidate 3 : result_t(const tuple_t &)
  Candidate 4 : result_t(const tuple_t &, RuntimeContext &)

  311 |     static_assert(!(std::is_same<tuple_t, std::false_type>::value || std::is_same<result_t, std::false_type>::value),
      |                                                             ^~~~~

If only one of the functions in MapFunctor is commented out, the code compiles. If both of the functions in MapFunctor are present and the static_assert lines above in the WindFlow source code are commented out, the code compiles.

Could you please let me know if there is an important technical reason to avoid processing more than one different input type by the same WindFlow node (assuming that the code thread safety is properly taken care of)? Or the errors above are only due to too restricting conditions in the static_assert calls?

Thank you very much for your help!

WindFlow-3.0.0 examples can not be compiled with fastflow-3.0.0

WindFlow-3.0.0 examples can not be compiled with fastflow-3.0.0 - the master source code of fastflow is required.
The problem is with the changed interface of ff::ff_a2a::change_secondset function, which is now

change_secondset(const std::vector<T*>& w, bool cleanup=false, bool remove_from_cleanuplist=false)

but it was

change_secondset(const std::vector<ff_node*>& w, bool cleanup=false)

in https://github.com/fastflow/fastflow/blob/3.0.0/ff/all2all.hpp

$ gcc Test1.C -std=c++17 -I $HOME/bin/fastflow-3.0.0 -I $HOME/bin/WindFlow-3.0.0/wf -o Test1
In file included from .../WindFlow-3.0.0/wf/pipegraph.hpp:34,
                 from .../WindFlow-3.0.0/wf/windflow.hpp:47,
                 from Test1.C:10:
.../WindFlow-3.0.0/wf/multipipe.hpp: In member function ‘std::vector<ff::ff_node*> wf::MultiPipe::normalize()’:
.../WindFlow-3.0.0/wf/multipipe.hpp:568:23: error: ‘class ff::ff_a2a’ has no member named ‘remove_from_cleanuplist’
  568 |                 last->remove_from_cleanuplist(first_set_last);
      |                       ^~~~~~~~~~~~~~~~~~~~~~~
.../WindFlow-3.0.0/wf/multipipe.hpp:573:83: error: no matching function for call to ‘ff::ff_a2a::change_secondset(std::vector<ff::ff_node*>&, bool, bool)’
  573 |                 secondToLast->change_secondset(second_set_secondToLast, true, true);
      |                                                                                   ^
In file included from .../fastflow-3.0.0/ff/farm.hpp:68,
                 from .../fastflow-3.0.0/ff/optimize.hpp:44,
                 from .../fastflow-3.0.0/ff/pipeline.hpp:1759,
                 from .../fastflow-3.0.0/ff/ff.hpp:28,
                 from Test1.C:9:
.../fastflow-3.0.0/ff/all2all.hpp:298:9: note: candidate: ‘int ff::ff_a2a::change_secondset(const std::vector<ff::ff_node*>&, bool)’
  298 |     int change_secondset(const std::vector<ff_node*>& w, bool cleanup=false) {
      |         ^~~~~~~~~~~~~~~~
.../fastflow-3.0.0/ff/all2all.hpp:298:9: note:   candidate expects 2 arguments, 3 provided

Toolchain to build WindFlow 2.2.0 examples on MacOS

Hi @mencagli,

I'm having some trouble running make on the most recent version of WindFlow. I use MacOS mojave. g++ --version returns Apple LLVM version 10.0.1 (clang-1001.0.46.4). G++ is mapped to clang somehow.

In our classes I saw that you use a Mac as well, so I suppose you successfully builds WindFlow examples. However, when I try it, the following compilation error happens on a file inside the includes folder:

In file included from test_mp_kf+pf_cb.cpp:42:
In file included from ../../includes/windflow.hpp:34:
../../includes/builders.hpp:1586:43: error: no matching function for call to 'get_tuple_t'
    using panefarm_t = Pane_Farm<decltype(get_tuple_t(func_F)),

Since the error comes from the template library, I think it would prevent me from using it

I'll try using older versions just to check if they build successfully. I'll also try to change the toolchain to the real GCC one, or update to Catalina in the hopes it upgrades clang as well. So my question also is: which toolchain you currently use run Windflow?

Disabling CPU affinity of auxiliary nodes to separate cores

First, you have created an awesome product! Thank you! We use it and we love it - very fast, reliable, and convenient.

I do have one issue (related to optimization of how CPU cores are used) that I am trying to resolve. In my test, I have created a very simple graph consisting of one source and one sink (both with parallelism=1). And then I checked the number of nodes in the graph graph.getNumThreads() which is equal to 2 as expected. When I run the code and look at the threads in Linux top, I can see that indeed only 2 threads are run. However, in the destructor of (fastflow) ~ff_node, I have added printing of what CPU core a given node was set affinity to, and I can see that actually 3 nodes are destroyed and each of them is pinned to a different CPU core (0,1,2) but the myid of the nodes are 0,1,0, so it seems that 2 nodes are actually copies of each other but they are still pinned to different CPU cores and only one of them is actually run in the code (the nodes pinned to cores 0 and 1 are run, and the node pinned to core 2 does nothing according to top):

// printouts by ff_node::~ff_node
CPUId: 0 myid:0
CPUId: 1 myid:1
CPUId: 2 myid:0

So, to improve the number of CPU cores which are available for nodes doing actual computing, how can I disable setting CPU affinity for the node, which ends up not being used in the graph?

Thank you very much for your help!

Can source be implemented as a callback function?

I am evaluating if WindFlow could be used in our project requiring low latency processing of streaming data. And I would appreciate if somebody could let me know if a source functor in WindFlow can be implemented as a callback function? Or the source functor always runs in its thread in a hot loop (constantly checking if the source has any new input for processing)?

Looking at the usage example (class Source_Functor), it seems that operator() causes inputs to be added to the shipper, but how does the code know when there is anything to be shipped, i.e. if a new incoming message has arrived to the source?

I do not see relevant examples in the tests directory either. In a true streaming application, the input samples are not generated all at once in the beginning of a program (which is what I see in the tests), but instead the code constantly listens to the incoming messages, calls a processing callback function on each message (or a group of them), which I expect to happen in the source nodes, and that triggers propagation of information through a computing graph, and the process continues until the code receives a signal to stop consuming incoming messages. Is it possible within WindFlow?

Thank you very much for your help!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.