nipype / pydra Goto Github PK
View Code? Open in Web Editor NEWPydra Dataflow Engine
Home Page: https://nipype.github.io/pydra/
License: Other
Pydra Dataflow Engine
Home Page: https://nipype.github.io/pydra/
License: Other
Something to consider: MagicStack/uvloop
uvloop makes asyncio 2-4x faster
@satra -- I've been working on the profiler (i.e. via network flow optimization using a combo of dask + networkx) and am wondering if you already have an API on hand for formatting networkx-based DAG's in dask-friendly format (see: https://docs.dask.org/en/latest/optimize.html#definitions and https://docs.dask.org/en/latest/custom-graphs.html)? If not I can/will write one?
Not sure what your thoughts are on DAG optimization using dask, but it offers considerable flexibility: https://docs.dask.org/en/latest/optimize.html#customizing-optimization. Curious to get your thoughts...
it would be handy to be able to create a Node
or Workflow
instance and then run it with .run(*args, **kwds)
.
currently things break if workingdir is set to None
(the default). i suggest keeping the default as None
but using a temporary directory if the workingdir is None
.
As noted in #81 (comment), I would suggest that we try out Azure Pipelines instead of AppVeyor.
Perhaps a good first step is somebody with access to a Windows computer to try to run the tests and estimate how big a job it is going to be to get us to compatibility. If that's going to be the struggle, learning a new CI infrastructure might be too much to add on top of it.
In order to ease transition for existing nipype users, a minimal goal is to ensure that MapNode
and iterables+JoinNode
functionality persists, in that nodes that fan out from a mapping can be recombined in the same order to produce lists of output.
Discussion was wide-ranging and perhaps at times experiencing a significant gravitas shortfall, but if I can remember correctly, we had the following genuine proposals:
Fan-out | Fan-in |
---|---|
map |
unmap |
map |
collate |
map |
isquash (i.e. squash by index) |
map |
zap |
split |
combine |
splitapply |
combine |
I'm sure I'm missing some. I would suggest we flesh out serious suggestions, with one comment per pair, and then have a quick round of voting, where heart (โค๏ธ) indicates your preference, thumbs-up (๐) indicates you would be happy enough with it, and thumbs-down (๐) indicates you strongly disapprove and will argue against it if it looks like we're settling on that one.
This is a separate discussion from value-based grouping or non-concatenation-based reductions, although you may want to weigh these options in terms of how easy they'll make naming such operations.
Edit: Please add any additional options you would like to vote on.
This is more for Nipype 2.0, but i'm sticking it here for now and for 0.1 release. We should push for this once the other 0.1 tasks are done, especially consolidation of the specs API.
possibly enable human manipulation in some steps.
hard to predict all consequences..;-)
Should think about the syntax for results for node and workflow with and without mappers.
What should be the syntax when wf is a node?
This will be important when adding join.
realtime monitoring of pydra execution status and messaging receiver.
Rarely, event loop is missing from travis thread, see:
https://travis-ci.org/nipype/pydra/jobs/560805001
If I create a wfnd
first and later use within wf
that has different input it works properly only if wfnd
was not run before wf
. See the tests:
@pytest.mark.parametrize("plugin", Plugins)
def test_wfasnd_wfndupdate(plugin):
""" workflow as a node
workflow-node with one task and no splitter
wfasnode input is updated to use the main workflow input
"""
wfnd = Workflow(name="wfnd", input_spec=["x"], x=2)
wfnd.add(add2(name="add2", x=wfnd.lzin.x))
wfnd.set_output([("out", wfnd.add2.lzout.out)])
wf = Workflow(name="wf", input_spec=["x"], x=3)
wfnd.inputs.x = wf.lzin.x
wf.add(wfnd)
wf.set_output([("out", wf.wfnd.lzout.out)])
wf.plugin = plugin
with Submitter(plugin=plugin) as sub:
sub(wf)
results = wf.result()
assert results.output.out == 5
assert wf.output_dir.exists()
@pytest.mark.xfail(reason="wfnd is not updating input for it's nodes")
@pytest.mark.parametrize("plugin", Plugins)
def test_wfasnd_wfndupdate_rerun(plugin):
""" workflow as a node
workflow-node with one task and no splitter
wfasnode is run first and later is
updated to use the main workflow input
"""
wfnd = Workflow(name="wfnd", input_spec=["x"], x=2)
wfnd.add(add2(name="add2", x=wfnd.lzin.x))
wfnd.set_output([("out", wfnd.add2.lzout.out)])
with Submitter(plugin=plugin) as sub:
sub(wfnd)
wf = Workflow(name="wf", input_spec=["x"], x=3)
# trying to set before
wfnd.inputs.x = wf.lzin.x
wf.add(wfnd)
# trying to set after add...
wf.wfnd.inputs.x = wf.lzin.x
wf.set_output([("out", wf.wfnd.lzout.out)])
wf.plugin = plugin
results = wf.result()
assert results.output.out == 5
assert wf.output_dir.exists()
Haven't debug too much, but it looks like in the second test the input wf.wfnd.inputs.x
is not taken from the wf.inputs
(it shows x=LF('wf', 'x')
), so the wf.wfnd.add2
takes the previously set input, i.e. 2
See, e.g.,
pydra/pydra/engine/tests/test_newnode.py
Line 1295 in ce19c71
pydra/pydra/engine/tests/test_newnode_neuro.py
Lines 34 to 40 in ce19c71
We should use datalad or OSF to retrieve data to run tests on.
To stay on top of pending deprecations and to not become one of those libraries that causes all downstream packages to emit tons of warnings or fiddle with warnings filters, we should set some Travis jobs to turn warnings into errors. To allow this to be opportunistic, rather than a constant pain, we should mark these jobs as allow_failure
.
It looks like Pytest has some mechanisms for modifying warning behavior, which will probably work better than the PYTHONWARNINGS="error"
environment variable tried out in #84.
The relevant workers should be run as context managers so that they clean up properly on exit.
This could be done within the worker object source:
with multiprocess.Pool() as p:
pass
with concurrent.futures.ProcessPoolExecutor() as p:
pass
with dask.distributed.Client() as c:
pass
or the worker objects could be made context-aware and run as context managers in Submitter.
with pydra.engine.workers.DaskWorker() as d:
pass
It would be good to be able to send the data at the end of a run to a specific location, server, stream. this should also allow renaming outputs using a template, thus this feature could combine datasink and rename into a task property.
data could be streamed as each task is finished rather than waiting for an entire workflow to be run. this can enable triggering other services.
https://www.commonwl.org/v1.0/CommandLineTool.html#Running_a_Command
create docs (hopefully docs can be autogenerated and tested)
We should think if this code should give pydra exception:
@pydra.to_task
def sum_a(a, b):
return a + b
task = sum_a()
task()
Once the API is more stable, I can add specific error objects like NodeError
and so on.
These should become one object, such that workflows can add Task objects directly. In pydra, a Task can have inputs for the underlying functionality but also for running the task being derived from the dataflow (e.g., environment specification).
all inputs to nodebase should be keyword args, any unmatched arguments will be assumed to be inputs for the underlying function/command.
Our PyPI classifiers say we support OSX, so we should add an/some entry/entries to Travis to do so.
Related to #32
Minimal task support currently merged. should be extended to provide complete isolation. i should be able to run the pydra workflow itself in one container, and have it launch other tasks inside that container or in remote containers (through slurm, aws, dind, singularity, etc.,.)
Suppose we have a function f(a, b)
and a node:
nodeA = Node(f, mapper=['a', 'b'])
nodeA.inputs = {'a': ['foo1', 'foo2'], 'b': ['bar1', 'bar2', 'bar3']}
Various further steps may occur, but then we have a new function g(d, e)
such that we wish to map over e
, and iterate in lockstep with the inputs of `nodeA.a':
nodeC = Node(g, mapper=('nodeA.a', 'e'))
nodeC.inputs.e = ['baz1', 'baz2']
In this case, the nodeC
instances that are downstream from nodeA.a == 'foo1'
will receive e == 'baz1'
. These will continue to map independently of nodeA.b
.
This would be equivalent to a complete mapper:
nodeC = Node(g, mapper=[('nodeA.a', 'e'), 'nodeA.b'])
nodeC.inputs.e = ['baz1', 'baz2']
If default out
for output is used (without defining output_names
- will be in pr) and a function returns two values, only one value is saved. See:
@pydra.to_task
def add_vector(x1, y1, x2, y2):
return (x1 + x2, y1 + y2)
task5 = add_vector(name="add_vect",
x1=[10, 20], y1=[1, 2], x2=[10, 20, 30], y2=[10, 20, 30])
task5.split(splitter=[("x1", "y1"), ("x2", "y2")])
task5(plugin="cf")
task5.result()
return dask support to pydra
see the xfailing test_wf_2b
: https://github.com/nipype/pydra/blob/master/pydra/engine/tests/test_workflow.py#L95
compare with test_wf_2
and test_wf_2a
cp.dumps(func)
returns different results for a different systems/setups. This goes to the task checksum, so it will be a problem for reusing results between systems.
xfailing test
we should determine whether the correct tasks are rerun using messaging.
if we write a message saying "Loading precomputed results", then we can count the number of such things for any graph and determine if the rerun was successful at only recomputing the necessary elements and also respecting graph hierarchy.
Hooks will allow automations to kick in for specific events.
Possible enumerated locations for hooks:
task_init_pre
task_init_post
run_pre
run_post
run_task_pre
run_task_post
workflow_init_pre
workflow_init_post
workflow_run_pre
workflow_run_post
Hooks can be registered through the Task/Node API.
...and move some of the instruction (black) from PR template to the guidelines
Implement a context manager that provides isolation of inputs/execution environment. This context manager could in principle manage:
potential future extension, merge pydra scheduler and plugins into a context manager.
Would be good to start off on the right foot, here.
the files are used in test_newnode_neuro.py
, for now can be found: https://www.dropbox.com/s/rrxc5kegeuzkbtr/fmriprep_test.tar?dl=0
from #119 example
import pydra
@pydra.mark.task
def power(a, b):
return a ** b
@pydra.mark.task
def identity(x):
return x
wf1 = pydra.Workflow(name="wf1", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf1.add(power(name="power", a=wf1.lzin.a, b=wf1.lzin.b).split(["a", "b"]).combine("a"))
wf1.add(identity(name="identity", x=wf1.power.lzout.out).combine("power.b"))
wf1.set_output({'out': wf1.identity.lzout.out})
wf1(plugin='cf')
this returns:
Result(output=Output(out=[[1, 1]]), runtime=None, errored=False)
should return (i believe)
Result(output=Output(out=[[1, 1],[4,8]]), runtime=None, errored=False)
I'm having issue with installing the package and the way we specify the requirements. I wasn't paying attention to this, I know that we used to have requirements.txt
, but it has disappeared and now everything is in __about__
. It was fine, but I'm doing something wrong in #76, where I've started exposing some objects to the user in the __init__
files. Suggestions will be appreciated!
test test_cache_propagation1
doesn't work on OSX, I believe this is a problem that OSX sets tempdir differently:
wf.cache_dir
is PosixPath('/var/folders/32/823z586933zbm26chcmcqq1m0000gp/T/tmpzmou1eal')
and t1.cache_dir
is
PosixPath('/private/var/folders/32/823z586933zbm26chcmcqq1m0000gp/T/tmpzmou1eal')
Moved from #100.
Currently, the cache directories for each task come in the form of <Class>_xxxxxxx
. There is not an easy way for a user to track down a particular task's cache_dir, especially in the case of larger workflows.
We should consider making these more identifiable down the line.
These are comments prompted by working through the tutorials, and can be split out into focused issues as needed, but just wanted to start making notes.
The following is from working through intro_task_state
, and skimming intro_workflow
enough to write a couple example workflows to demonstrate a point below. I will pick up later.
task1.state.splitter == 'add_two.x'
seems to be using the function name as the Task
name. Does this cause problems in a workflow where I might use two of the same function-based Task
s?.combine(["a", "b"])
would produce something ordered like [(a1, b1), (a2, b1), (a1, b2), (a2, b2)]
while .combine(["b", "a"])
would produce [(a1, b1), (a1, b2), (a2, b1), (a2, b2)]
. (Or possibly the reverse; I haven't thought it through carefully enough to be sure.).combine("x").combine("n")
seems like it should be equivalent to .combine(["x", "n"])
. Probably an edge case, but I think making this (and multiple split
s on a single task) doable should help clarify the semantics for users, as well as our own thinking. For instance, these should be equivalent (they don't actually run, but hopefully they're clear):@pydra.mark.task
def power(a, b):
return a ** b
@pydra.mark.task
def identity(x):
return x
wf1 = pydra.Workflow(name="wf1", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf1.add(power(name="power", a=wf1.lzin.a, b=wf1.lzin.b).split(["a", "b"]).combine("a"))
wf1.add(identity(name="identity", x=wf1.power.lzout.out).combine("power.b"))
wf1.set_output({'out': wf1.identity.lzout.out})
wf2 = pydra.Workflow(name="wf2", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf2.add(power(name="power", a=wf2.lzin.a, b=wf2.lzin.b).split(["a", "b"]).combine("a").combine("b"))
wf2.set_output({'out': wf2.power.lzout.out})
wf3 = pydra.Workflow(name="wf3", input_spec=["a", "b"], a=[1, 2], b=[2, 3])
wf3.add(power(name="power", a=wf3.lzin.a, b=wf3.lzin.b).split("a").split("b").combine(["a", "b"])
wf3.set_output({'out': wf3.power.lzout.out})
State.__str__
would be nice to show something that actually describes the internal state, rather than just repr(State)
.nr_proc
is pretty unintuitive.splitter
in Task().split(splitter=...)
. I think it's fine as an internal variable, so we don't need to do a massive search-and-replace, but splitter
kind of grates on me.
split(over=...)
or split(by=...)
which would read somewhat more like English. I'm not sure how this would feel to non-native speakers, though.split("a")
or split(["a", "b"])
or the like.What version of Pydra are you using?
0.0.1+298.g759ec5d
make test
in a fresh conda environment after having installed pydra with pip install --editable .
List the steps you performed that revealed the bug to you.
Include any code samples. Enclose them in triple back-ticks (```)
Like this:
conda create -n pydra python=3.7
conda activate pydra
cd /path/to/pydra
pip install --no-cache-dir --editable .[tests]
make test
Dependencies that are missing:
Should the plugin dependencies be added to the 'tests' extra?
Are we planning to use GitHub actions? It seems our beta has been approved for the nipype organization.
right now the test with Audit.ALL
gives a pickling error.
I'm not sure if this was finished or not.
Trying to think through the execution model, I think a useful distinction will be between graph nodes and execution nodes. Graph nodes would be aware of the graph structure and their own state:
class Node:
def __init__(self, runnable, inputs=None, split=None, combine=None):
self._parents = []
self._runnable = runnable
self._inputs = inputs or {}
self._state = State(self, split, combine)
def add_parent(self, node):
self._parents.append(node)
@property
def state(self):
return self._state.resolve([parent.state for parent in self._parents])
def get_execution_nodes(self):
return [ExecutionNode(self._runnable, inputs={self._inputs, **state_inputs})
for state_inputs in self.state.generate_inputs()]
def run(self):
for execnode in self.get_execution_nodes():
if execnode.ready:
execnode.run()
I would treat ExecutionNode
s ephemerally. Their inputs should implement some kind of future/promise interface, and their outputs should be loadable from file, such that:
>>> execnode1 = node.get_execnodes()[0]
>>> execnode1.run()
>>> execnode1.results
{'out_a': <a>,
'out_b': <b>}
>>> execnode2 = node.get_execnodes()[0]
>>> execnode2 is execnode1
False
>>> execnode2.inputs.in_a is execnode1.inputs.in_a
True
>>> execnode2.results
{'out_a': <a>,
'out_b': <b>}
>>> os.remove_dirs(execnode2._results_path)
>>> execnode1.results
RuntimeError("Node not run")
This approach to Node
leads to each node storing a partial State
object that is only fully resolved at runtime, when all of its parents are known. If we have a sensible pairwise combination semantics for state, we can do the following:
class State:
@classmethod
def merge(cls, state_a, state_b, context):
if state_a is state_b:
return state_a
if not state_b.axes:
return state_a
if not state_a.axes:
return state_b
# create a new state from two states, using current state's splits
# to assist in reordering
def resolve(self, states):
if states == []:
return self
state = states[0]
for tmp_state in states[1:] + [self]:
state = self.merge(state, tmp_state, self._split)
return state
It needs fleshing out, but the idea is that resolve
will return the most recently changed State
object, where changing means adding, removing, or reordering axes. Calling for inputs from the states will check to make sure that the shapes of inputs match, when needed.
I have some other thoughts as to how execution falls out of here, but I'll let those percolate for a bit. In the meantime, what do y'all think? Does this simplify/complicate how we're currently thinking of nodes? I do think it's a significantly different approach to state, so I don't want to charge ahead without feedback.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.