Giter Site home page Giter Site logo

selinon / selinon Goto Github PK

View Code? Open in Web Editor NEW
297.0 24.0 33.0 7.81 MB

An advanced distributed task flow management on top of Celery

Home Page: https://selinon.readthedocs.io/

License: Other

Makefile 0.27% Shell 0.13% Python 99.59% HTML 0.01%
schedule-tasks celery kubernetes flow-management distributed-computing openshift python task selinon big-data

selinon's Introduction

Selinon

An advanced task flow management on top of Celery.

codecov PyPI Current Version PyPI Implementation PyPI Wheel Travis CI Documentation Status GitHub stars GitHub license Twitter

Is this project helpful? Send me a simple warm message!

Crossroad

Last stable release: Selinon 1.3.0

TLDR;

An advanced flow management above Celery (an asynchronous distributed task queue) written in Python3, that allows you to:

  • Dynamically schedule tasks based on results of previous tasks
  • Group tasks into flows in simple YAML configuration files
  • Schedule flows from other flows (even recursively)
  • Store results of tasks in your storages and databases transparently, validate results against defined JSON schemas
  • Do redeployment respecting changes in the YAML configuration files without purging queues (migrations)
  • Track flow progress via the build-in tracing mechanism
  • Complex per-task or per-flow failure handling with fallback tasks or fallback flows
  • No DAG limitation in your flows
  • Selectively pick tasks in your flow graphs that should be executed respecting task dependencies
  • Make your deployment easy to orchestrate using orchestration tools such as Kubernetes
  • Highly scalable Turing complete solution for big data processing pipelines
  • And (of course) much more... check docs

YouTube Video

Let's explain Selinon using a YouTube video (click to redirect to YouTube).

About

This tool is an implementation above Celery that enables you to define flows and dependencies in flows, schedule tasks based on results of Celery workers, their success or any external events. If you are not familiar with Celery, check out its homepage www.celeryproject.org or this nice tutorial.

Selinon was originally designed to take care of advanced flows in one of Red Hat products, where it already served thousands of flows and tasks. Its main aim is to simplify specifying group of tasks, grouping tasks into flows, handle data and execution dependencies between tasks and flows, easily reuse tasks and flows, model advanced execution units in YAML configuration files and make the whole system easy to model, easy to maintain and easy to debug.

By placing declarative configuration of the whole system into YAML files you can keep tasks as simple as needed. Storing results of tasks in databases, modeling dependencies or executing fallback tasks/flows on failures are separated from task logic. This gives you a power to dynamically change task and flow dependencies on demand, optimize data retrieval and data storage from databases per task bases or even track progress based on events traced in the system.

Selinon was designed to serve millions of tasks in clusters or data centers orchestrated by Kubernetes, OpenShift or any other orchestration tool, but can simplify even small systems. Moreover, Selinon can make them easily scalable in the future and make developer's life much easier.

A Quick First Overview

Selinon is serving recipes in a distributed environment, so let's make a dinner!

If we want to make a dinner, we need to buy ingredients. These ingredients are bought in buyIngredientsFlow. This flow consists of multiple tasks, but let's focus on our main flow. Once all ingredients are bought, we can start preparing our dinner in prepareFlow. Again, this flow consists of some additional steps that need to be done in order to accomplish our future needs. As you can see, if anything goes wrong in mentioned flows (see red arrows), we make a fallback to pizza with beer which we order. To make beer cool, we place it to our Fridge storage. If we successfully finished prepareFlow after successful shopping, we can proceed to serveDinnerFlow.

Just to point out - grey nodes represent flows (which can be made of other flows or tasks) and white (rounded) nodes are tasks. Conditions are represented in hexagons (see bellow). Black arrows represent time or data dependencies between our nodes, grey arrows pinpoint where results of tasks are stored.

For our dinner we need eggs, flour and some additional ingredients. Moreover, we conditionally buy a flower based on our condition. Our task BuyFlowerTask will not be scheduled (or executed) if our condition is False. Conditions are made of predicates and these predicates can be grouped as desired with logical operators. You can define your own predicates if you want (default are available in selinon.predicates). Everything that is bought is stored in Basket storage transparently.

Let's visualise our buyIngredientsFlow:

As stated in our main flow after buying ingredients, we proceed to dinner preparation but first we need to check our recipe that is hosted at http://recipes.lan/how-to-bake-pie.html. Any ingredients we bought are transparently retrieved from defined storage as defined in our YAML configuration file. We warm up our oven to expected temperature and once the temperature is reached and we have finished with dough, we can proceed to baking.

Based on the description above, our prepareFlow will look like the following graph:

Once everything is done we serve plates. As we want to serve plates for all guests we need to make sure we schedule N tasks of type ServePlateTask. Each time we run our whole dinner flow, number of guests may vary so make sure no guest stays hungry. Our serveDinnerFlow would look like the following graph:

This example demonstrates very simple flows. The whole configuration can be found here. Just check it out how you can easily define your flows! You can find a script that visualises graphs based on the YAML configuration in this repo as well.

More info

The example was intentionally simplified. You can also parametrize your flows, schedule N tasks (where N is a run-time evaluated variable), do result caching, placing tasks on separate queues in order to be capable of doing fluent system updates, throttle execution of certain tasks in time, propagate results of tasks to sub-flows etc. Just check documentation for more info.

Live Demo

A live demo with few examples can be found here. Feel free to check it out.

Installation

$ pip3 install selinon

Available extras:

Extras can be installed via:

$ pip3 install selinon[celery,mongodb,postgresql,redis,s3,sentry]

Feel free to select only needed extras for your setup.

selinon's People

Contributors

fridex avatar joebeeson avatar shaded-enmity avatar vlagorsse avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

selinon's Issues

Return code of pylint is ignored

With the following change #11 CI runs pylint, the result of pylint is ignored as there are currently errors. Once errors will be fixed, pylint should be enabled.

Outdated documentation

The current state of documentation is out of sync compared to sources. Needs to be updated.

Fatal dispatcher failure when there are issues with DB/storage

Currently, dispatcher will fail if there are some issues with DB/Storage. For example if there is scheduled a flow and it is not possible to retrieve results from database in order to evaluate edge condition, dispatcher task will fail causing fatal flow fail. It should retry (possibly on another cluster node) by default (or make this configurable).

Provide an option for eager stopping on task/flow failures

Currently, if a node (flow or task) fails in a flow, dispatcher keeps scheduling other tasks so the flow is not dependent on task time execution on different nodes. There could be however use cases where it would be great to stop scheduling new tasks and rather mark flow as failed immediately.

Add support for Cache in task results

With #14 implemented caches could be reused to cache results of tasks (celery tasks). The difference is that results could be cached only if a Celery task is marked as finished. This caching mechanism could be useful for dispatcher in order to check state of tasks.

Tainted edges in migrations could cause flow retry/failure too early

As the state carried within flow does not check whether the given edge was actually triggered or it's just in the edge wait list (waiting_edges), there could occur corner case where a migration marks flow as failed even though the edge was not triggered.

A fix for this should distinguish between triggered_edges and waiting_edges in the flow. The check for tainted flow should than respect triggered_edges.

how to call task flows define in yaml config file from remote server?

Issue description:

no docment describe how to call flows define in yaml config file

Steps to reproduce:

Expected behavior:

Actual behavior:

Links to logs:

A link to logs posted on http://pastebin.com/ (or any other publicly reachable service).

Are you able to reproduce the issue with selinonlib-cli simulate?

Selinon & Selinonlib & Celery version:

Running selinonlib-cli version gave me:

  • I have tried the latest master but the issue still pesists:
    pip3 install --force-reinstall git+https://github.com/selinon/selinonlib@master
    pip3 install --force-reinstall git+https://github.com/selinon/selinon@master

Links to YAML config files:

Links to task implementations:

OpenTracing support

It is possible to trace flow (dispatcher and task) actions via callbacks. It would be nice to provide OpenTracing support on demand.

Enhance built-in predicates

There is a possibility of enhancing built in predicates by predicates that would inspect parent or failure nodes from flows. E.g.:

 propagate_compound_parent: flow1
 edges:
   - from:
     to: flow1
   - from: flow1
     to: Task2
     condition:
         in_parent:
           node: Task1
           parent_subflow: flow1

Introduce support for scheduling N tasks based on iterable

Currently, there is no way how to define a flow which would spawn N tasks based on iterable. A use case:

I want to spawn N-times Task2 based on results of Task1. The YAML configuration could be:

flow-definition:
  - name: flow1
    edges:
      - from:
        to: Task1
      - from: Task1
        to: Task2
        condition:
          name: 'cond1'
        foreach:
          iterable: result # or node_args
          condition:
            name: cond2
            args:
             - foo
             - bar

Semantics:

When flow1 is started, Task1 is scheduled (condition is always true). When Task1 finishes, N tasks of type Task2 are scheduled so that:

  1. First, there is evaluated condition 'cond1'.

  2. If cond1 is false, do not proceed, otherwise:

  3. For each result (or node_args) evaluate condition 'cond2' with result['foo']['bar'][i] (or node_args['foo']['bar'][i]) like:

    if cond1():
      for item in node_args['foo']['bar']:
         if cond2(item):
           Task2().apply_async(node_args)

This way, we can define flows which would look like:

flow1:
                                       |
                                     Task1
                                       |
    -----------------------------------------------------------------------
    |                |                 |                                  | 
  Task2            Task2             Task2                 ...            Task2

Detect dead paths in YAML configuration

For the following configuration:

tasks:
  - name: A
    import: foo.bar

  - name: B
    import: foo.bar

  - name: C
    import: foo.bar

  - name: D
    import: foo.bar

flows:
  - flow1

flow-definitions:
    - name: flow1
      edges:
        - from:
          to: A
        - from: A
          to: B
        - from:
            - B
            - C
          to: D
        - from: D
          to: C

selinonlib should report error as C and D will never be run due to cyclic dependencies. We should examine attainability of graph nodes in YAML config file.

why not just use celery task as selinon task?

In the past few days, I have been searching and comparing list of distributed task flow engins support dynamic linking. At last, I found selinon is just what I want. It is a greate job!
But after some work diving into selinon, I have one question that why not selinon task just use celery task?
Here is my idea:

  1. selinon dispatcher keep using celery task for calling one flow, so when calling a flow, a new dispatch task worker is invoked.
  2. Task in flow is also celery task. So selinon dispatcher use send_task to call task in the flow and subflow, and use celery asyncResult.get() to approach task dependency. For example, task3 depends on task1 and task2, so just call async_result_of_task1.get() and async_result_of_task2.get(), the wait time is reasonable because the longest time of task1 and task2 is always needed. Retry of task can be reached by just celery task retries.
  3. One disadvantages of this model is the workflow throughput. The amount of dispatcher workers need to be greater than the max concurrency of all flow request. But I think this can be solved by coroutines such as greenlet(not sure).

I know maybe there are some irresistible reason that selinon choose not to use celery task, I wonder the design consideration bebind it.

Move cache definition to nodes.yaml

Let's move cache definition from flow to nodes.yaml. This way a cache instance wouldn't be bound to the particular flow but rather can be reused across different flows.

Introduce store_error_only configuration option

It would be nice to add support for store_error_only configuration option. Using this option will ensure that actual results are not stored in a storage, but rather only errors are stored using Storage.store_error() method.

Let a user run a specific task inside flow

Currently, the only execution unit that can be run from user-point-of-view is a flow (group of tasks). It would be nice to let a user run a specific task in a flow. This will require to resolve all the tasks that are required (time or data dependencies of targeted task), run them (ideally optionally) and optionally run tasks that are affected by targeted task:

run_task('myFlow', task='Task3', node_args={'foo': 'bar'}, run_subsequent=True)

Let a flow fail even if fallback was run

It would be nice to let a flow fail even if there was a failure in the flow and a fallback was started:

failures:
  - nodes:
     - task1
    fallback:
     - task1_fallback
    propagate_failure: true

Initialize Selinon based on already loaded configuration in a dict

Configuration can be initialized only based on explicitly supplied paths to configuration files. It would be nice to provide a function Config.from_dict() (refactor Config.from_files()) that would initialize configuration based on already (dynamically) generated configuration present in a dict (memory).

Add support for Cache

It would be nice to add cache support to storage pool. This way workers can hit cache and do not necessarily query database for already available results (in Dispatcher task and SelinonTaskEnvelope task as well).

Guidance for integrating Selinon as a library

This is a great project and reading code, This looks like doing almost everything I want to do. Because for my project I already have a DSL in yaml which user can specify flows and while trying to figure out if there was better workflow engine for Celery I found this. Which is great!. I can totally switch to using Selinon yaml DSL.

But I am still struggling trying to figure out how I can integrate it with my project, As its going to be a REST service(Flask/Redis/Postgres/Celery/Flower), I am trying to figure out the following

  • how I can allow creating/editing/deleting of flows from API
  • Triggering flows with different values
  • Monitor status of the flow i.e. which stage is running, status of stage/flow, Result from the single stage or entire flow
  • Ability to rerun failed stages or force rerun of flows.
  • Possibly how to extend to send notifications via email about success/failure.

Please can you point me in right direction, either which code blocks to use or best if you can extend the demo to show all the cool features which I see in Readme and code.

Introduce fast-path fallback

Currently, the dispatcher runs defined fallbacks only if there is some failure in the flow and there are no active nodes. This can be optimized by introducing "fast-path fallbacks". What we can do:

  1. we already have a tree that is created of all possible permutations for defined failures
  2. if there is a path from the current failureNode to another failureNode using currently active node in the system or it's (indirect) transitive child, we cannot run fallback, otherwise we can

Here is an example:

Let's say we have following dependencies defined for a flow in YAML file, where T1..3 are tasks (applicable for subflows as well).

from:
to:
 - T1
 - T2
 - T3
failures:
  - nodes:
    - T1
    - T2
    fallback:
    - T4

  - nodes:
    - T1
    fallback:
    - T4
  1. If T1 failes and T2, T3 are still active, we have to wait
  2. If T1 fails, T2 is finished successfully and T3 is still active, we can schedule T4 according to the second fallback
  3. If T1, T2 fail and T3 is still active, we can schedule T4 according to the first fallback

What needs to be kept in mind:

  • we have to inspect all possible subsequent nodes that can be scheduled - all transitive, even indirect
  • we have to inspect all possible recursive dependencies in flows - we can detect recursion which would block fast-path evaluation as optimization

Using templates in yaml definition for variables ?

Wondering if selinon has plans to integrate jinja2 so users can use templates for variables and also ability to dynamically chain output of one task to input of other tasks or params for other tasks.

Refactor code (with Selinonlib)

I'm not happy about the current state of how Selinon/Selinonlib code is organized - there is a cyclic dependency (Selinon <-> Selinonlib). This situation resulted as a consequence of feature requirements that were incorporated into the code (simulator, migrations, ...). It would be great to reorganize code so there is no cyclic dependency and Selinonlib is rather a supportive library with cool stuff for Selinon.

Make Celery pluggable

It would be nice to abstract Celery and make it pluggable. With this change, it would be possible to define own backend that respects certain interface (the relevant ones would be "get task status" and schedule a task).

Also it will simplify CLI execution without any need to use Celery mocks.

Introduce timeout configuration option for tasks

It would be nice to support task timeouts so task execution time can exceed:

tasks:
  - name: 'Task1'
    import: 'myapp.tasks'
    timeout: 600

In that case a special exception would be raised (e.g. selinon.TimeoutError) and the timeout would be reported as task error (with respect to task's retry configuration).

1.0.0rc1 breaks flow definitions

Hi there, small heads up and incomplete issue description due to lack of time (sorry for that), the issue doesn't seem to persist in selinon@master

If additional information is required i'd be glad to help.

Issue description:

The CI/CD-Pipelines of our Selinon based project began to fail with a ConfigurationError after 1.0.0rc1 was released to pypi (we didn't use a fixed version in requirements.txt).

Steps to reproduce:

Update project dependencies/selinon and selinonlib from 0.1.0rc9 to 1.0.0rc1 via pip/pypi

Expected behavior:

Well I did expect that my flow definitions would still work.

Actual behavior:

Exits with ConfigurationError

Links to logs:

A link to logs posted on http://pastebin.com/ (or any other publicly reachable service).

16: [23:04:47] selinonlib.system from_files:1180: Parsing 'myflows.yaml'
16: [23:04:47] selinonlib.system _setup_flows:1148: Failed to parse flow definition for flow 'flow1', in file 'myflows.yaml'
16: [23:04:47] selinonlib-cli <module>:448: No flow edges provided in flow 'myflows.yaml'
Traceback (most recent call last):
  File "/usr/local/bin/selinonlib-cli", line 446, in <module>
    sys.exit(Command.main())
  File "/usr/local/bin/selinonlib-cli", line 441, in main
    return command(args)
  File "/usr/local/bin/selinonlib-cli", line 225, in plot
    system = System.from_files(args.nodes_definition, args.flow_definitions)
  File "/usr/local/lib/python3.6/site-packages/selinonlib/system.py", line 1187, in from_files
    cls._setup_flows(system, content, flow_file)
  File "/usr/local/lib/python3.6/site-packages/selinonlib/system.py", line 1144, in _setup_flows
    flow.parse_definition(flow_def, system)
  File "/usr/local/lib/python3.6/site-packages/selinonlib/flow.py", line 120, in parse_definition
    raise ConfigurationError("No flow edges provided in flow %r" % self.name)
selinonlib.errors.ConfigurationError: No flow edges provided in flow 'flow1'

Are you able to reproduce the issue with selinon-cli execute?

not tested.

Selinon & Celery version:

can't tell right now but should be 1.0.0rc1

Running selinon-cli version gave me:

  • I have tried the latest master but the issue still pesists:
    pip3 install --force-reinstall git+https://github.com/selinon/selinon@master
    THIS IS NOT THE CASE, ISSUE DOESN'T PERSIST IN SELINON@MASTER

Links to YAML config files:

myflows.yaml:

#[...]
flow-definitions:
  - name: 'flow1'
    edges:
      - from:
        to: 'task1'
      - from: 'task1'
        to: 'flow2'
      - from: 'flow2'
        to: 'task2'
#[...]

Links to task implementations:

not provided

Do not use autogenerated code

Currently, Selinon uses auto-generated code from selinonlib. It was nice for debugging the initial implementation, but we should avoid code auto-generation.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.