Giter Site home page Giter Site logo

selinon / selinon Goto Github PK

View Code? Open in Web Editor NEW
299.0 24.0 33.0 7.81 MB

An advanced distributed task flow management on top of Celery

Home Page: https://selinon.readthedocs.io/

License: Other

Makefile 0.27% Shell 0.13% Python 99.59% HTML 0.01%
schedule-tasks celery kubernetes flow-management distributed-computing openshift python task selinon big-data

selinon's Issues

Introduce fast-path fallback

Currently, the dispatcher runs defined fallbacks only if there is some failure in the flow and there are no active nodes. This can be optimized by introducing "fast-path fallbacks". What we can do:

  1. we already have a tree that is created of all possible permutations for defined failures
  2. if there is a path from the current failureNode to another failureNode using currently active node in the system or it's (indirect) transitive child, we cannot run fallback, otherwise we can

Here is an example:

Let's say we have following dependencies defined for a flow in YAML file, where T1..3 are tasks (applicable for subflows as well).

from:
to:
 - T1
 - T2
 - T3
failures:
  - nodes:
    - T1
    - T2
    fallback:
    - T4

  - nodes:
    - T1
    fallback:
    - T4
  1. If T1 failes and T2, T3 are still active, we have to wait
  2. If T1 fails, T2 is finished successfully and T3 is still active, we can schedule T4 according to the second fallback
  3. If T1, T2 fail and T3 is still active, we can schedule T4 according to the first fallback

What needs to be kept in mind:

  • we have to inspect all possible subsequent nodes that can be scheduled - all transitive, even indirect
  • we have to inspect all possible recursive dependencies in flows - we can detect recursion which would block fast-path evaluation as optimization

Introduce support for scheduling N tasks based on iterable

Currently, there is no way how to define a flow which would spawn N tasks based on iterable. A use case:

I want to spawn N-times Task2 based on results of Task1. The YAML configuration could be:

flow-definition:
  - name: flow1
    edges:
      - from:
        to: Task1
      - from: Task1
        to: Task2
        condition:
          name: 'cond1'
        foreach:
          iterable: result # or node_args
          condition:
            name: cond2
            args:
             - foo
             - bar

Semantics:

When flow1 is started, Task1 is scheduled (condition is always true). When Task1 finishes, N tasks of type Task2 are scheduled so that:

  1. First, there is evaluated condition 'cond1'.

  2. If cond1 is false, do not proceed, otherwise:

  3. For each result (or node_args) evaluate condition 'cond2' with result['foo']['bar'][i] (or node_args['foo']['bar'][i]) like:

    if cond1():
      for item in node_args['foo']['bar']:
         if cond2(item):
           Task2().apply_async(node_args)

This way, we can define flows which would look like:

flow1:
                                       |
                                     Task1
                                       |
    -----------------------------------------------------------------------
    |                |                 |                                  | 
  Task2            Task2             Task2                 ...            Task2

Initialize Selinon based on already loaded configuration in a dict

Configuration can be initialized only based on explicitly supplied paths to configuration files. It would be nice to provide a function Config.from_dict() (refactor Config.from_files()) that would initialize configuration based on already (dynamically) generated configuration present in a dict (memory).

Provide an option for eager stopping on task/flow failures

Currently, if a node (flow or task) fails in a flow, dispatcher keeps scheduling other tasks so the flow is not dependent on task time execution on different nodes. There could be however use cases where it would be great to stop scheduling new tasks and rather mark flow as failed immediately.

Outdated documentation

The current state of documentation is out of sync compared to sources. Needs to be updated.

Tainted edges in migrations could cause flow retry/failure too early

As the state carried within flow does not check whether the given edge was actually triggered or it's just in the edge wait list (waiting_edges), there could occur corner case where a migration marks flow as failed even though the edge was not triggered.

A fix for this should distinguish between triggered_edges and waiting_edges in the flow. The check for tainted flow should than respect triggered_edges.

Introduce store_error_only configuration option

It would be nice to add support for store_error_only configuration option. Using this option will ensure that actual results are not stored in a storage, but rather only errors are stored using Storage.store_error() method.

1.0.0rc1 breaks flow definitions

Hi there, small heads up and incomplete issue description due to lack of time (sorry for that), the issue doesn't seem to persist in selinon@master

If additional information is required i'd be glad to help.

Issue description:

The CI/CD-Pipelines of our Selinon based project began to fail with a ConfigurationError after 1.0.0rc1 was released to pypi (we didn't use a fixed version in requirements.txt).

Steps to reproduce:

Update project dependencies/selinon and selinonlib from 0.1.0rc9 to 1.0.0rc1 via pip/pypi

Expected behavior:

Well I did expect that my flow definitions would still work.

Actual behavior:

Exits with ConfigurationError

Links to logs:

A link to logs posted on http://pastebin.com/ (or any other publicly reachable service).

16: [23:04:47] selinonlib.system from_files:1180: Parsing 'myflows.yaml'
16: [23:04:47] selinonlib.system _setup_flows:1148: Failed to parse flow definition for flow 'flow1', in file 'myflows.yaml'
16: [23:04:47] selinonlib-cli <module>:448: No flow edges provided in flow 'myflows.yaml'
Traceback (most recent call last):
  File "/usr/local/bin/selinonlib-cli", line 446, in <module>
    sys.exit(Command.main())
  File "/usr/local/bin/selinonlib-cli", line 441, in main
    return command(args)
  File "/usr/local/bin/selinonlib-cli", line 225, in plot
    system = System.from_files(args.nodes_definition, args.flow_definitions)
  File "/usr/local/lib/python3.6/site-packages/selinonlib/system.py", line 1187, in from_files
    cls._setup_flows(system, content, flow_file)
  File "/usr/local/lib/python3.6/site-packages/selinonlib/system.py", line 1144, in _setup_flows
    flow.parse_definition(flow_def, system)
  File "/usr/local/lib/python3.6/site-packages/selinonlib/flow.py", line 120, in parse_definition
    raise ConfigurationError("No flow edges provided in flow %r" % self.name)
selinonlib.errors.ConfigurationError: No flow edges provided in flow 'flow1'

Are you able to reproduce the issue with selinon-cli execute?

not tested.

Selinon & Celery version:

can't tell right now but should be 1.0.0rc1

Running selinon-cli version gave me:

  • I have tried the latest master but the issue still pesists:
    pip3 install --force-reinstall git+https://github.com/selinon/selinon@master
    THIS IS NOT THE CASE, ISSUE DOESN'T PERSIST IN SELINON@MASTER

Links to YAML config files:

myflows.yaml:

#[...]
flow-definitions:
  - name: 'flow1'
    edges:
      - from:
        to: 'task1'
      - from: 'task1'
        to: 'flow2'
      - from: 'flow2'
        to: 'task2'
#[...]

Links to task implementations:

not provided

Move cache definition to nodes.yaml

Let's move cache definition from flow to nodes.yaml. This way a cache instance wouldn't be bound to the particular flow but rather can be reused across different flows.

Add support for Cache in task results

With #14 implemented caches could be reused to cache results of tasks (celery tasks). The difference is that results could be cached only if a Celery task is marked as finished. This caching mechanism could be useful for dispatcher in order to check state of tasks.

Enhance built-in predicates

There is a possibility of enhancing built in predicates by predicates that would inspect parent or failure nodes from flows. E.g.:

 propagate_compound_parent: flow1
 edges:
   - from:
     to: flow1
   - from: flow1
     to: Task2
     condition:
         in_parent:
           node: Task1
           parent_subflow: flow1

Introduce timeout configuration option for tasks

It would be nice to support task timeouts so task execution time can exceed:

tasks:
  - name: 'Task1'
    import: 'myapp.tasks'
    timeout: 600

In that case a special exception would be raised (e.g. selinon.TimeoutError) and the timeout would be reported as task error (with respect to task's retry configuration).

Detect dead paths in YAML configuration

For the following configuration:

tasks:
  - name: A
    import: foo.bar

  - name: B
    import: foo.bar

  - name: C
    import: foo.bar

  - name: D
    import: foo.bar

flows:
  - flow1

flow-definitions:
    - name: flow1
      edges:
        - from:
          to: A
        - from: A
          to: B
        - from:
            - B
            - C
          to: D
        - from: D
          to: C

selinonlib should report error as C and D will never be run due to cyclic dependencies. We should examine attainability of graph nodes in YAML config file.

Let a flow fail even if fallback was run

It would be nice to let a flow fail even if there was a failure in the flow and a fallback was started:

failures:
  - nodes:
     - task1
    fallback:
     - task1_fallback
    propagate_failure: true

Fatal dispatcher failure when there are issues with DB/storage

Currently, dispatcher will fail if there are some issues with DB/Storage. For example if there is scheduled a flow and it is not possible to retrieve results from database in order to evaluate edge condition, dispatcher task will fail causing fatal flow fail. It should retry (possibly on another cluster node) by default (or make this configurable).

why not just use celery task as selinon task?

In the past few days, I have been searching and comparing list of distributed task flow engins support dynamic linking. At last, I found selinon is just what I want. It is a greate job!
But after some work diving into selinon, I have one question that why not selinon task just use celery task?
Here is my idea:

  1. selinon dispatcher keep using celery task for calling one flow, so when calling a flow, a new dispatch task worker is invoked.
  2. Task in flow is also celery task. So selinon dispatcher use send_task to call task in the flow and subflow, and use celery asyncResult.get() to approach task dependency. For example, task3 depends on task1 and task2, so just call async_result_of_task1.get() and async_result_of_task2.get(), the wait time is reasonable because the longest time of task1 and task2 is always needed. Retry of task can be reached by just celery task retries.
  3. One disadvantages of this model is the workflow throughput. The amount of dispatcher workers need to be greater than the max concurrency of all flow request. But I think this can be solved by coroutines such as greenlet(not sure).

I know maybe there are some irresistible reason that selinon choose not to use celery task, I wonder the design consideration bebind it.

Make Celery pluggable

It would be nice to abstract Celery and make it pluggable. With this change, it would be possible to define own backend that respects certain interface (the relevant ones would be "get task status" and schedule a task).

Also it will simplify CLI execution without any need to use Celery mocks.

Add support for Cache

It would be nice to add cache support to storage pool. This way workers can hit cache and do not necessarily query database for already available results (in Dispatcher task and SelinonTaskEnvelope task as well).

Using templates in yaml definition for variables ?

Wondering if selinon has plans to integrate jinja2 so users can use templates for variables and also ability to dynamically chain output of one task to input of other tasks or params for other tasks.

Refactor code (with Selinonlib)

I'm not happy about the current state of how Selinon/Selinonlib code is organized - there is a cyclic dependency (Selinon <-> Selinonlib). This situation resulted as a consequence of feature requirements that were incorporated into the code (simulator, migrations, ...). It would be great to reorganize code so there is no cyclic dependency and Selinonlib is rather a supportive library with cool stuff for Selinon.

how to call task flows define in yaml config file from remote server?

Issue description:

no docment describe how to call flows define in yaml config file

Steps to reproduce:

Expected behavior:

Actual behavior:

Links to logs:

A link to logs posted on http://pastebin.com/ (or any other publicly reachable service).

Are you able to reproduce the issue with selinonlib-cli simulate?

Selinon & Selinonlib & Celery version:

Running selinonlib-cli version gave me:

  • I have tried the latest master but the issue still pesists:
    pip3 install --force-reinstall git+https://github.com/selinon/selinonlib@master
    pip3 install --force-reinstall git+https://github.com/selinon/selinon@master

Links to YAML config files:

Links to task implementations:

Do not use autogenerated code

Currently, Selinon uses auto-generated code from selinonlib. It was nice for debugging the initial implementation, but we should avoid code auto-generation.

OpenTracing support

It is possible to trace flow (dispatcher and task) actions via callbacks. It would be nice to provide OpenTracing support on demand.

Let a user run a specific task inside flow

Currently, the only execution unit that can be run from user-point-of-view is a flow (group of tasks). It would be nice to let a user run a specific task in a flow. This will require to resolve all the tasks that are required (time or data dependencies of targeted task), run them (ideally optionally) and optionally run tasks that are affected by targeted task:

run_task('myFlow', task='Task3', node_args={'foo': 'bar'}, run_subsequent=True)

Guidance for integrating Selinon as a library

This is a great project and reading code, This looks like doing almost everything I want to do. Because for my project I already have a DSL in yaml which user can specify flows and while trying to figure out if there was better workflow engine for Celery I found this. Which is great!. I can totally switch to using Selinon yaml DSL.

But I am still struggling trying to figure out how I can integrate it with my project, As its going to be a REST service(Flask/Redis/Postgres/Celery/Flower), I am trying to figure out the following

  • how I can allow creating/editing/deleting of flows from API
  • Triggering flows with different values
  • Monitor status of the flow i.e. which stage is running, status of stage/flow, Result from the single stage or entire flow
  • Ability to rerun failed stages or force rerun of flows.
  • Possibly how to extend to send notifications via email about success/failure.

Please can you point me in right direction, either which code blocks to use or best if you can extend the demo to show all the cool features which I see in Readme and code.

Return code of pylint is ignored

With the following change #11 CI runs pylint, the result of pylint is ignored as there are currently errors. Once errors will be fixed, pylint should be enabled.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.