selinon / selinon Goto Github PK
View Code? Open in Web Editor NEWAn advanced distributed task flow management on top of Celery
Home Page: https://selinon.readthedocs.io/
License: Other
An advanced distributed task flow management on top of Celery
Home Page: https://selinon.readthedocs.io/
License: Other
Currently, the dispatcher runs defined fallbacks only if there is some failure in the flow and there are no active nodes. This can be optimized by introducing "fast-path fallbacks". What we can do:
Here is an example:
Let's say we have following dependencies defined for a flow in YAML file, where T1..3 are tasks (applicable for subflows as well).
from:
to:
- T1
- T2
- T3
failures:
- nodes:
- T1
- T2
fallback:
- T4
- nodes:
- T1
fallback:
- T4
What needs to be kept in mind:
As Selinonlib is now merged with Selinon, there is no need to translate configuration to Python code, write it to a file and after that load and parse it again. Instead, there could be directly initialized configuration.
Currently, there is no way how to define a flow which would spawn N tasks based on iterable. A use case:
I want to spawn N-times Task2 based on results of Task1. The YAML configuration could be:
flow-definition:
- name: flow1
edges:
- from:
to: Task1
- from: Task1
to: Task2
condition:
name: 'cond1'
foreach:
iterable: result # or node_args
condition:
name: cond2
args:
- foo
- bar
Semantics:
When flow1 is started, Task1 is scheduled (condition is always true). When Task1 finishes, N tasks of type Task2 are scheduled so that:
First, there is evaluated condition 'cond1'.
If cond1 is false, do not proceed, otherwise:
For each result (or node_args) evaluate condition 'cond2' with result['foo']['bar'][i]
(or node_args['foo']['bar'][i]
) like:
if cond1():
for item in node_args['foo']['bar']:
if cond2(item):
Task2().apply_async(node_args)
This way, we can define flows which would look like:
flow1:
|
Task1
|
-----------------------------------------------------------------------
| | | |
Task2 Task2 Task2 ... Task2
Selinon and Selinonlib now installs all the dependencies (redis, celery, ...). As Selinon can be run as a standalone CLI application without using broker, celery's result backend etc, it would be nice to install dependencies on demand - there can be used extras as defined in the PEP 508 standard.
Configuration can be initialized only based on explicitly supplied paths to configuration files. It would be nice to provide a function Config.from_dict()
(refactor Config.from_files()
) that would initialize configuration based on already (dynamically) generated configuration present in a dict (memory).
Prepared storages are not safe to be used with Celery when concurrency is set (e.g. 2+).
Currently, if a node (flow or task) fails in a flow, dispatcher keeps scheduling other tasks so the flow is not dependent on task time execution on different nodes. There could be however use cases where it would be great to stop scheduling new tasks and rather mark flow as failed immediately.
The current state of documentation is out of sync compared to sources. Needs to be updated.
Currently, one of the major limitations Selinon has is the fact that results of parent sub-flows cannot be used in conditions. It would be nice to include such capability.
Currently, Dispatcher keeps a result of a task in cache even if the task has no storage associated. This can introduce a lot of troubles and unexpected behaviour in distributed environment.
SSIA
As the state carried within flow does not check whether the given edge was actually triggered or it's just in the edge wait list (waiting_edges
), there could occur corner case where a migration marks flow as failed even though the edge was not triggered.
A fix for this should distinguish between triggered_edges
and waiting_edges
in the flow. The check for tainted flow should than respect triggered_edges
.
It would be nice to regenerate all flow graphs so they respect the current graph plotting style.
Let make possible to propagate finished and failed nodes from subflows. This way a user can get information about finished and failed nodes in parent subflow.
SSIA
Explore possibilities to enhance logging of tracepoints using daiquiri [1]. This enhanced logging lib can be also used in Selinonlib. See [2] for examples.
Thanks to @jd.
[1] https://pypi.python.org/pypi/daiquiri
[2] https://julien.danjou.info/blog/python-logging-easy-with-daiquiri
It would be great to move from the currentCammelCase module naming and use snake_case as described in PEP8 standard.
It would be nice to add support for store_error_only
configuration option. Using this option will ensure that actual results are not stored in a storage, but rather only errors are stored using Storage.store_error()
method.
Currently, there is no possibility to define a condition on edge that was triggered by task error.
Hi there, small heads up and incomplete issue description due to lack of time (sorry for that), the issue doesn't seem to persist in selinon@master
If additional information is required i'd be glad to help.
The CI/CD-Pipelines of our Selinon based project began to fail with a ConfigurationError
after 1.0.0rc1 was released to pypi (we didn't use a fixed version in requirements.txt).
Update project dependencies/selinon and selinonlib from 0.1.0rc9 to 1.0.0rc1 via pip/pypi
Well I did expect that my flow definitions would still work.
Exits with ConfigurationError
A link to logs posted on http://pastebin.com/ (or any other publicly reachable service).
16: [23:04:47] selinonlib.system from_files:1180: Parsing 'myflows.yaml'
16: [23:04:47] selinonlib.system _setup_flows:1148: Failed to parse flow definition for flow 'flow1', in file 'myflows.yaml'
16: [23:04:47] selinonlib-cli <module>:448: No flow edges provided in flow 'myflows.yaml'
Traceback (most recent call last):
File "/usr/local/bin/selinonlib-cli", line 446, in <module>
sys.exit(Command.main())
File "/usr/local/bin/selinonlib-cli", line 441, in main
return command(args)
File "/usr/local/bin/selinonlib-cli", line 225, in plot
system = System.from_files(args.nodes_definition, args.flow_definitions)
File "/usr/local/lib/python3.6/site-packages/selinonlib/system.py", line 1187, in from_files
cls._setup_flows(system, content, flow_file)
File "/usr/local/lib/python3.6/site-packages/selinonlib/system.py", line 1144, in _setup_flows
flow.parse_definition(flow_def, system)
File "/usr/local/lib/python3.6/site-packages/selinonlib/flow.py", line 120, in parse_definition
raise ConfigurationError("No flow edges provided in flow %r" % self.name)
selinonlib.errors.ConfigurationError: No flow edges provided in flow 'flow1'
selinon-cli execute
?not tested.
can't tell right now but should be 1.0.0rc1
Running selinon-cli version
gave me:
pip3 install --force-reinstall git+https://github.com/selinon/selinon@master
myflows.yaml:
#[...]
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'task1'
- from: 'task1'
to: 'flow2'
- from: 'flow2'
to: 'task2'
#[...]
not provided
It would be nice to have a logger that directly reports JSONs consumable for elastic search JSON parser.
Let's move cache definition from flow to nodes.yaml. This way a cache instance wouldn't be bound to the particular flow but rather can be reused across different flows.
With #14 implemented caches could be reused to cache results of tasks (celery tasks). The difference is that results could be cached only if a Celery task is marked as finished. This caching mechanism could be useful for dispatcher in order to check state of tasks.
It would be nice to provide generic info in init.py, e.g.:
__version__ = '1.0.0'
__version_info__ = __version__.split('.')
__title__ = 'selinon'
__author__ = 'Fridolin Pokorny'
__license__ = 'BSD'
__copyright__ = 'Copyright 2017 Fridolin Pokorny'
There is a possibility of enhancing built in predicates by predicates that would inspect parent or failure nodes from flows. E.g.:
propagate_compound_parent: flow1
edges:
- from:
to: flow1
- from: flow1
to: Task2
condition:
in_parent:
node: Task1
parent_subflow: flow1
Currently, there is pre-computed a tree of all permutations to support fallbacks. It's worth to consider to stick with a hashed frozen set.
It would be nice to support task timeouts so task execution time can exceed:
tasks:
- name: 'Task1'
import: 'myapp.tasks'
timeout: 600
In that case a special exception would be raised (e.g. selinon.TimeoutError
) and the timeout would be reported as task error (with respect to task's retry configuration).
Currently, there is no possibility to get info about exceptions that were raised in tasks. To be more precise it is possible to get using Celery routines, but would worth to encapsulate this in Selinon API.
For the following configuration:
tasks:
- name: A
import: foo.bar
- name: B
import: foo.bar
- name: C
import: foo.bar
- name: D
import: foo.bar
flows:
- flow1
flow-definitions:
- name: flow1
edges:
- from:
to: A
- from: A
to: B
- from:
- B
- C
to: D
- from: D
to: C
selinonlib should report error as C and D will never be run due to cyclic dependencies. We should examine attainability of graph nodes in YAML config file.
It would be nice to let a flow fail even if there was a failure in the flow and a fallback was started:
failures:
- nodes:
- task1
fallback:
- task1_fallback
propagate_failure: true
For me, the first impression is everything. Right now, the readme doesn't engage me in any way. Including a simple example (ideally with a diagram) would definitely do the trick.
Currently, dispatcher will fail if there are some issues with DB/Storage. For example if there is scheduled a flow and it is not possible to retrieve results from database in order to evaluate edge condition, dispatcher task will fail causing fatal flow fail. It should retry (possibly on another cluster node) by default (or make this configurable).
In the past few days, I have been searching and comparing list of distributed task flow engins support dynamic linking. At last, I found selinon is just what I want. It is a greate job!
But after some work diving into selinon, I have one question that why not selinon task just use celery task?
Here is my idea:
I know maybe there are some irresistible reason that selinon choose not to use celery task, I wonder the design consideration bebind it.
Selinonlib already parses time_limit for tasks, we should respect it in SelinonTaskEnvelope.
SSIA [1]
It would be nice to abstract Celery and make it pluggable. With this change, it would be possible to define own backend that respects certain interface (the relevant ones would be "get task status" and schedule a task).
Also it will simplify CLI execution without any need to use Celery mocks.
It would be nice to add cache support to storage pool. This way workers can hit cache and do not necessarily query database for already available results (in Dispatcher task and SelinonTaskEnvelope task as well).
Just make module names more readable.
Wondering if selinon has plans to integrate jinja2 so users can use templates for variables and also ability to dynamically chain output of one task to input of other tasks or params for other tasks.
I'm not happy about the current state of how Selinon/Selinonlib code is organized - there is a cyclic dependency (Selinon <-> Selinonlib). This situation resulted as a consequence of feature requirements that were incorporated into the code (simulator, migrations, ...). It would be great to reorganize code so there is no cyclic dependency and Selinonlib is rather a supportive library with cool stuff for Selinon.
no docment describe how to call flows define in yaml config file
A link to logs posted on http://pastebin.com/ (or any other publicly reachable service).
selinonlib-cli simulate
?Running selinonlib-cli version
gave me:
pip3 install --force-reinstall git+https://github.com/selinon/selinonlib@master
pip3 install --force-reinstall git+https://github.com/selinon/selinon@master
Currently, Selinon uses auto-generated code from selinonlib. It was nice for debugging the initial implementation, but we should avoid code auto-generation.
It is possible to trace flow (dispatcher and task) actions via callbacks. It would be nice to provide OpenTracing support on demand.
Currently, the only execution unit that can be run from user-point-of-view is a flow (group of tasks). It would be nice to let a user run a specific task in a flow. This will require to resolve all the tasks that are required (time or data dependencies of targeted task), run them (ideally optionally) and optionally run tasks that are affected by targeted task:
run_task('myFlow', task='Task3', node_args={'foo': 'bar'}, run_subsequent=True)
I would like to ask for new release, which includes code base around Sentry configuration through environmental variables.
Thank you!
This is a great project and reading code, This looks like doing almost everything I want to do. Because for my project I already have a DSL in yaml which user can specify flows and while trying to figure out if there was better workflow engine for Celery I found this. Which is great!. I can totally switch to using Selinon yaml DSL.
But I am still struggling trying to figure out how I can integrate it with my project, As its going to be a REST service(Flask/Redis/Postgres/Celery/Flower), I am trying to figure out the following
Please can you point me in right direction, either which code blocks to use or best if you can extend the demo to show all the cool features which I see in Readme and code.
SSIA
With the following change #11 CI runs pylint, the result of pylint is ignored as there are currently errors. Once errors will be fixed, pylint should be enabled.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.